Пользовательская временная метка и управление окнами для Pub / Sub в DataFlow (Apache Beam) - PullRequest
0 голосов
/ 28 марта 2019

Я хочу реализовать следующий сценарий с использованием потокового конвейера в Apache Beam (и запуск его в Google DataFlow):

  1. Чтение сообщений из Pub / Sub (строки JSON)
  2. Десериализация JSON
  3. Использование настраиваемого поля (скажем, timeStamp) в качестве значения метки времени для элемента обработки
  4. Применение фиксированного управления окнами для 60 seconds
  5. Извлечение ключа из элементов и группыпо ключу
  6. << выполнить дальнейшую обработку >>

Я пытался решить эту проблему, используя как Java (Scala), так и Python, но ни одно из решений не сработало.

  1. Решение Python
# p is beam.Pipeline()
_ = (p | beam.io.ReadFromPubSub(subscription="my_sub")
        | beam.Map(add_timestamping)
        | beam.WindowInto(window.FixedWindows(60))
        | beam.Map(lambda elem: elem) # exracting the key somehow, not relevant here
        | beam.GroupByKey()
        # (...)
        | beam.io.WriteToPubSub("output_topic")
        )
p.run()

add_timestamping функция согласно документация :

def add_timestamping(elem):
    import json
    import apache_beam as beam
    msg = json.loads(elem)
    unix_timestamp = msg['timeStamp'] / 1000
    return beam.window.TimestampedValue(msg, unix_timestamp)

Вывод решения Python :

  1. При использовании DirectRunner окна испускаются, и само окно более или менее подходит в зависимости от задержки.
  2. При использовании DataFlowRunner, ВСЕ сообщения пропускаются со счетчиком, появляющимся в интерфейсе DataFlow: dropDueToLateness .

Решение Java / Scala (я использовал Scio , но это также происходит в чистом Beam SDK и в Java)
sc.pubsubSubscription[String]("my_sub")
    .applyTransform(ParDo.of(new CustomTs()))
    .withFixedWindows(Duration.standardSeconds(60))
    .map(x => x) // exracting the key somehow, not relevant here
    .groupByKey
    // (...)
    .saveAsPubsub("output_topic")

Добавлениепользовательская отметка времени согласно документации :

import io.circe.parser._
class CustomTs extends DoFn[String, String] {
  @ProcessElement
  def processElement(@Element element: String, out: DoFn.OutputReceiver[String]): Unit = {
    val json = parse(element).right.get
    val timestampMillis: Long = json.hcursor.downField("timeStamp").as[Long].getOrElse(0)
    out.outputWithTimestamp(element, new Instant(timestampMillis))
  }
}

Вывод решения Java / Scala :

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.IllegalArgumentException:
 Cannot output with timestamp 2019-03-02T00:51:39.124Z. 
 Output timestamps must be no earlier than the timestamp of the current input
 (2019-03-28T14:57:53.195Z) minus the allowed skew (0 milliseconds).

Я не могу использовать DoFn.getAllowedTimestampSkew здесьпоскольку это уже устарело, и я не знаю, какие диапазоны исторических данных будут отправлены.


Возможность обработки исторических данных имеет решающее значение для моего проекта (эти данные будут отправлены в Pub / Subиз какого-то магазина).Конвейер должен работать как с текущими данными, так и с историческими.

Мой вопрос: Как обрабатывать данные с использованием пользовательских временных меток с возможностью работы с окнами, определенными с помощью API Beam?

1 Ответ

1 голос
/ 29 марта 2019

Если у вас есть возможность извлечь отметку времени в точке вставки в PubSub, вы сможете использовать указанные пользователем отметки времени в качестве метаданных.Информация о том, как это описано в 1.9 SDK.

https://cloud.google.com/dataflow/model/pubsub-io#timestamps-and-record-ids

"Вы можете использовать указанные пользователем временные метки для точного контроля над тем, как элементы, считанные из Cloud Pub / Sub, назначаются окнам в конвейере потока данных."

Поскольку 1.9 устарела, в 2.11 вам понадобится https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withTimestampAttribute-java.lang.String-

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...