Предисловие
На сегодняшний день в текущей версии Python SDK 2.9.0 функция облака Google PubSub все еще активно развивается и в настоящее время предназначена только для потоковой обработки, поэтому их нельзя использовать как часть.вашего конвейера для публикации в PubSub.
Вы все еще можете установить клиент Google PubSub, но вы наткнетесь на некоторые ограничения и нюансы из-за особенностей времени выполнения луча Apache, таких как:
- Вам нужно будет установить
google-cloud-pubsub
и указать свою зависимость для Apache beam - Преодолеть несовместимость сериализации установленного клиента PubSub для конвейера Apache Beam
- Каким-то образом явно предоставить учетные данные для аутентификации клиента PubSub
Вот базовое пошаговое руководство для получения конвейера, способного публиковать элементы из обработанной коллекции в облачный сервис PubSub Google.
Давайте предположим базовую структуру:
├── my_stuff
│ ├── __init__.py
│ └── my_package.py
├── .gitignore
├── main.py
├── README.md
└── setup.py
Получить клиент PubSub
Установить google-cloud-pubsub (при условии pip: pip install google-cloud-pubsub
), и теперь вы столкнетесь с ударом , предоставляющим зависимости , и я предлагаю следовать последнему разделу документации и предоставить setup.py
некоторые метаданные и зависимость вашего пакета:
from setuptools import find_packages, setup
setup(
name="my_stuff",
version="0.1.0",
description="My Pipeline for DirectRunner that publishes to Google Cloud PubSub",
long_description=open("README.md").read(),
classifiers=[
"Programming Language :: Python",
],
author="John Doe",
author_email="john.doe@example.com",
url="https://example.com/",
license="proprietary",
packages=find_packages(),
include_package_data=True,
zip_safe=True,
install_requires=['google-cloud-pubsub==0.35.4'],
)
Вы можете использовать pip freeze | grep google-cloud-pubsub
, чтобы получить точно установленную версию.
Сделать PubSub Client Serializable
Если вы просто попытаетесь Map
опубликовать функцию, имеющую экземпляр клиента PubSub, то вы получите странную ошибку от Apache Beam, говорящую о том, что она не может десериализовать ее.Чтобы преодолеть это, вы можете создать вызываемый класс и реализовать несколько методов, следуя документации pickle , чтобы преодолеть проблему сериализации.
Вот базовый пример издателя PubSub, создающего экземпляр клиента:
class Publisher(object):
"""
PubSub publisher for the beam pipeline.
"""
@staticmethod
def init_client():
return pubsub_v1.PublisherClient(credentials='TODO: Get credentials')
def __init__(self, topic):
self.topic = topic
self.client = self.init_client()
def __getstate__(self):
return self.topic
def __setstate__(self, topic):
self.topic = topic
self.client = self.init_client()
def __call__(self, item, *args, **kwargs):
self.client.publish(self.topic, b'{}'.format(item))
Укажите учетные данные
Из вопроса не было ясно, есть ли необходимость повторно использовать учетные данные времени выполнения конвейера или их необходимо указывать отдельно.Есть несколько способов указать учетные данные.Вы можете создать их, используя service_account.Credentials
или повторно использовать учетные данные из среды выполнения, используя GoogleCredentials
.
Жестко закодированные учетные данные
from google.cloud import pubsub_v1
from google.oauth2 import service_account
client1 = pubsub_v1.PublisherClient(
credentials=service_account.Credentials.from_service_account_info({
"type": "service_account",
"project_id": "****",
"private_key_id": "****",
"private_key": "-----BEGIN PRIVATE KEY-----\n****\n-----END PRIVATE KEY-----\n",
"client_email": "****",
"client_id": "****",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/***"
})
)
Учетные данные из файла JSON (через переменную среды ОС)
from google.cloud import pubsub_v1
from google.oauth2 import service_account
import os
client2 = pubsub_v1.PublisherClient(
credentials=service_account.Credentials.from_service_account_file(
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
)
)
Повторное использование учетных данных времени выполнения (используется для выполнения конвейера)
from google.cloud import pubsub_v1
from oauth2client.client import GoogleCredentials
client3 = pubsub_v1.PublisherClient(
credentials=GoogleCredentials.get_application_default()
)
Использование
Теперь вы можете использовать Publisher
в своем конвейере, как любое другое преобразование:
published = (pipeline | "Publish" >> beam.Map(Publisher("pub/sub/topic")))
Только не забудьте, что вам нужно будет добавить аргумент --setup_file /absolute/path/to/setup.py
для конвейера.