Я использую TextIO для чтения из облачного хранилища. Поскольку я хочу, чтобы работа выполнялась постоянно, я использую watchForNewFiles.
Для полноты данных, которые я читаю, работает нормально, если я использую ограниченные PCollections (без watchForNewFiles и BigQueryIO в пакетном режиме), поэтому проблема с данными отсутствует.
У меня есть p.run (). WaitUntilFinish (); в моем коде, так что конвейер работает. И это не дает никакой ошибки.
Версия луча Apache 2.8.0
PCollection<String> stream =
p.apply("Read File", TextIO
.read()
.from(options.getInput())
.watchForNewFiles(
Duration.standardMinutes(1),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))
)
.withCompression(Compression.AUTO));
Это прекрасно работает и читает файлы, как только они становятся доступны. PCollection не ограничен и содержит строки текста из этих файлов.
После некоторых преобразований
PCollection<List<String>> lines = stream.apply("Parse CSV",
ParDo.of(new ParseCSV())
);
PCollection<TableRow> rows = lines.apply("Convert to BQ",
ParDo.of(new BigQueryConverter(schema))
);
Шаг ParseCSV добавляет временные метки к своему получателю через outputWithTimestamp.
Я получаю коллекцию таблиц TableRows, готовых для потоковой передачи в BigQuery.
Для этого я использую
WriteResult result = rows.apply("WriteToBigQuery",
BigQueryIO.
<TableRow>write()
.withFormatFunction(input -> input)
.withSchema(bqSchema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(options.getOutput())
);
Это никогда не записывает данные в BigQuery. Если я загляну в пользовательский интерфейс, я вижу, что BigQueryIO делает
- ShardTableWrites
- TagWithUniqueId
- Перестановки
Данные входят и выходят из первых двух шагов. Но никогда не перестановки. Это только читает данные, но никогда не передает данные. Шаг внутри Reshuffle, который вызывает это GroupByKey.
Поскольку коллекция не ограничена, я попытался настроить окно с
lines = lines.apply(Window.configure()
.<List<String>>into(FixedWindows
.of(Duration.standardSeconds(10))
)
);
, что должно заставить все, что делает GroupByKey, освободить окно через 10 секунд. Но это не так.
lines = lines.apply(Window.configure()
.<List<String>>into(FixedWindows
.of(Duration.standardSeconds(10))
)
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10)))
.withAllowedLateness(Duration.standardSeconds(0))
.discardingFiredPanes()
);
Добавление определенного триггера на время обработки также не помогло.
Любая подсказка? Заранее спасибо!