Поток данных Apache Beam Streaming застрял при чтении миджей из PubSub - PullRequest
0 голосов
/ 18 марта 2019

Существует странная ситуация с Apache Beam Python 2.7 SDK 2.11.0 для довольно простого конвейера потока данных, когда необходимо разбирать сообщения из подписки PubSub на несколько таблиц BiqQuery.

Нетошибки и исключения, и конвейер потока данных, кажется, работает в консоли монитора (как показано ниже), но начальный шаг «Read PubSub Message» выполняется бесконечно, не выпуская сообщения чтения для следующего преобразования.

Pipeline

Это происходит только с DataflowRunner , но прекрасно работает с DirectRunner , поэтому в потоке данных он просто продолжает накапливать сообщения из PubSub и медленно раздувать память без дальнейшего продвижения.

Трубопровод определяется следующим образом:

with b.Pipeline(options=pipeline_options) as p:
    metrics, failure \
        = (p
           | 'Read PubSub Message' >> b.io.ReadFromPubSub(subscription=config.incoming, with_attributes=True)
           | 'Parse Telemetry' >> b.FlatMap(parse, Monitor(config)).with_outputs('SUCCESS', 'FAILURE')
           )

    (metrics | 'Store Metrics' >> b.io.WriteToBigQuery(**raw_table))
    (failure | 'Store Failures' >> b.io.WriteToBigQuery(**err_table))

И анализ дает два помеченных выхода

def parse(message, monitor):  # type: (PubsubMessage, Monitor) -> None
    """
    Args:
        message (PubsubMessage): Telemetry message from a PubSub
        monitor (Monitor): Pipeline KPI monitor
    """
    try:
        metric = RawMetric.from_pub_sub(message)  # type: RawMetric
        yield pv.TaggedOutput(tag='SUCCESS', value=metric.for_json())
        try:
            monitor.count(metric)
        except Exception as error:
            yield pv.TaggedOutput(tag='FAILURE', value="monitor screw up")
    except Exception as error:
        yield pv.TaggedOutput(tag='FAILURE', value="RawMetric screw up")

Я что-то пропустил?

...