Сбой потока данных при объединении нескольких сторонних входов в потоковый конвейер - PullRequest
0 голосов
/ 20 февраля 2020

Я построил потоковый конвейер потока данных с помощью Python SDK (Apache Beam Python 3.7 SDK 2.19.0). Представление исходных данных:

| Phone Number | Call length |
|--------------|-------------|
| 1234         | 6           |
| 1234         | 2           |
| 5678         | 5           |

Идея состоит в том, чтобы найти среднюю продолжительность телефонного звонка для номера в каждой строке для данного окна. Данные считываются как строки CSV из Pub / Sub, и я добавляю значение ко всем строкам, соответствующим средней длине вызова номера:

| Phone Number | Call length | mean call length |
|--------------|-------------|------------------|
| 1234         | 6           | 4                |
| 1234         | 2           | 4                |
| 5678         | 5           | 5                |

Я использую следующий конвейер:

    with beam.Pipeline(options=pipeline_options) as pipeline:
        calls = (pipeline
             | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(subscription=input_sub)
             | 'byte_to_string' >> beam.Map(lambda x: x.decode("utf-8"))
             | 'windows' >> beam.WindowInto(window.FixedWindows(10))   
            )

        mean_call_length = (calls
             | 'call_length_for_number' >> beam.ParDo(get_list_of_pairs_of_tuples(),'number','call_length')
             |  'mean_call_length_per_number' >> beam.combiners.Mean.PerKey()
            )

        recombine = (calls
              | 'Create dictionary from raw string' >> beam.ParDo(SplitToDict())
              |  'Add mean' >> beam.FlatMap(combine_calcs,pvalue.AsList(mean_call_length))
              | 'encode to bytes' >> beam.Map(lambda x: str(x).encode())
              | 'write to output topic' >> beam.io.WriteToPubSub(topic=output_topic)
            )

Это прекрасно работает локально (с DirectRunner), но не работает при запуске в GCP (DataflowRunner). Кажется, это также работает нормально, когда я вычисляю только 1 из частоты числа или средней длины вызова.

Я вижу исключение java в журналах потока данных, которое содержит:

Caused by: org.apache.beam.sdk.coders.CoderException: java.io.EOFException 

Что похоже на исключение конца файла, связанное с потоковой передачей.

Конвейер визуализируется в потоке данных здесь:

enter image description here

Есть идеи?

1 Ответ

1 голос
/ 25 февраля 2020

Я обошел эту проблему, преобразовав результаты средних вычислений в целые числа, изменив конвейер:

...
mean_call_length = (calls
             | 'call_length_for_number' >> beam.ParDo(get_list_of_pairs_of_tuples(),'number','call_length')
             |  'mean_call_length_per_number' >> beam.combiners.Mean.PerKey())
             | 'convert_mean_to_int' >> beam.Map(lambda elem: (elem[0],int(elem[1])))
...

Кажется, что между Python SDK и базовым * 1006 возникла некоторая проблема с типизацией * код; код Java, по-видимому, ожидает, что элемент [1] будет иметь определенное количество байтов, которое превышается, если вы передаете float через Python SDK.

...