Как PubSub из не потокового потока данных - PullRequest
0 голосов
/ 07 июня 2018

Я хочу уведомить GAE о завершении задания DataFlow.

Итак, я пытаюсь создать следующий конвейер:

from apache_beam.io.gcp.pubsub import WriteStringsToPubSub

  (
      p | ReadFromDatastore(google_cloud_options.project, query)
...snip...
      | 'send to pubsub' >> WriteStringsToPubSub(GCS_TOPIC)
  )

Но приведенный выше код вызывает следующую ошибку:

ValueError: PubSubPayloadSink в настоящее время доступен для использования только в потоковых конвейерах.

Как уведомить приложение GAE о завершении задания?Должен ли я использовать GCPClinentLibrary?(из google.cloud import pubsub_v1)

1 Ответ

0 голосов
/ 21 июня 2018

Вы должны развернуть свой код, используя представление Pcollection для своего конвейера.Обмен сообщениями Pubsub (как вы делали с помощью PubsubIO).это один из способов общения с GAE.Пожалуйста, следуйте здесь , чтобы получить более подробную информацию.Простой код может быть следующим:

  // streamData is Unbounded.
  PCollection<String> streamData = ...;
  streamData.apply(PubsubIO.Write.named("WriteToPubsub")
                       .topic("/topics/my-topic"));

Код Python также может быть записан следующим образом (pcol должен быть помещен после всех stes, включая ptransform, Pardo и т. Д.):

from apache_beam.io.gcp.pubsub import WriteStringsToPubSub
pcoll = (p
             | ReadStringsFromPubSub('projects/fakeprj/topics/baz')
             | beam.Map(lambda x: x))
             | WriteStringsToPubSub('projects/fakeprj/topics/a_topic')

Для более подробной информации, пожалуйста, просмотрите здесь .

Обратите внимание, что конвейер также можно запустить из приложения Python GAE.Таким образом, вам нужно просто добавить команду уведомления после следующих строк (чтобы убедиться, что задание завершено и можно использовать функцию wait_until_finish ().):

  results = p.run()
  result.wait_until_finish()

Вы также можете использовать grpc.Для более подробной информации, пожалуйста, прочитайте здесь .

Также могут использоваться протоколы AMQP, такие как клиентские коды Python RabbitMQ и ZeroMQ.

...