Я хотел бы реализовать очень простой лучевой конвейер:
read google storage links to text files from PubSub topic->read each text line by line->write to BigQuery.
В Apache Beam предварительно реализован PTransform для каждого процесса.
Таким образом, конвейер будет:
Pipeline | ReadFromPubSub("topic_name") | ReadAllFromText() | WriteToBigQuery("table_name")
Однако ReadAllFromText
() как-то блокирует конвейер. Создание пользовательского PTransform, который возвращает случайную строку после чтения из PubSub и записи ее в таблицу BigQuery, работает нормально (без блокировки). Добавление фиксированного окна в 3 секунды или запуск каждого элемента также не решает проблему.
Каждый файл имеет размер около 10 МБ и 23 тыс. Строк.
К сожалению, я не могу найти документацию о том, как ReadAllFromText
должен работать. Было бы очень странно, если бы он попытался заблокировать конвейер, пока не прочитает все файлы. И я ожидаю, что функция будет выдвигать каждую строку в конвейер, как только она прочитает строку.
Есть ли известная причина для такого поведения? Это ошибка или я что-то не так делаю?
Код трубопровода:
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromPubSub(subscription=source_dict["input"]) \
| 'window' >> beam.WindowInto(window.FixedWindows(3, 0)) \
| ReadAllFromText(skip_header_lines=1)
elements = lines | beam.ParDo(SplitPayload())
elements | WriteToBigQuery(source_dict["output"], write_disposition=BigQueryDisposition.WRITE_APPEND)
.
.
.
class SplitPayload(beam.DoFn):
def process(self, element, *args, **kwargs):
timestamp, id, payload = element.split(";")
return [{
'timestamp': timestamp,
'id': id,
'payload': payload
}]