Я построил потоковый конвейер потока данных с помощью Python SDK (Apache Beam Python 3.7 SDK 2.19.0). Представление исходных данных:
| Phone Number | Call length |
|--------------|-------------|
| 1234 | 6 |
| 1234 | 2 |
| 5678 | 5 |
Идея состоит в том, чтобы найти среднюю продолжительность телефонного звонка для номера в каждой строке для данного окна. Данные считываются как строки CSV из Pub / Sub, и я добавляю значение ко всем строкам, соответствующим средней длине вызова номера:
| Phone Number | Call length | mean call length |
|--------------|-------------|------------------|
| 1234 | 6 | 4 |
| 1234 | 2 | 4 |
| 5678 | 5 | 5 |
Я использую следующий конвейер:
with beam.Pipeline(options=pipeline_options) as pipeline:
calls = (pipeline
| 'Read PubSub Messages' >> beam.io.ReadFromPubSub(subscription=input_sub)
| 'byte_to_string' >> beam.Map(lambda x: x.decode("utf-8"))
| 'windows' >> beam.WindowInto(window.FixedWindows(10))
)
mean_call_length = (calls
| 'call_length_for_number' >> beam.ParDo(get_list_of_pairs_of_tuples(),'number','call_length')
| 'mean_call_length_per_number' >> beam.combiners.Mean.PerKey()
)
recombine = (calls
| 'Create dictionary from raw string' >> beam.ParDo(SplitToDict())
| 'Add mean' >> beam.FlatMap(combine_calcs,pvalue.AsList(mean_call_length))
| 'encode to bytes' >> beam.Map(lambda x: str(x).encode())
| 'write to output topic' >> beam.io.WriteToPubSub(topic=output_topic)
)
Это прекрасно работает локально (с DirectRunner), но не работает при запуске в GCP (DataflowRunner). Кажется, это также работает нормально, когда я вычисляю только 1 из частоты числа или средней длины вызова.
Я вижу исключение java в журналах потока данных, которое содержит:
Caused by: org.apache.beam.sdk.coders.CoderException: java.io.EOFException
Что похоже на исключение конца файла, связанное с потоковой передачей.
Конвейер визуализируется в потоке данных здесь:
Есть идеи?