При использовании неограниченной PCollection из TextIO в BigQuery данные застряли в Reshuffle / GroupByKey внутри BigQueryIO. - PullRequest
0 голосов
/ 12 ноября 2018

Я использую TextIO для чтения из облачного хранилища. Поскольку я хочу, чтобы работа выполнялась постоянно, я использую watchForNewFiles.

Для полноты данных, которые я читаю, работает нормально, если я использую ограниченные PCollections (без watchForNewFiles и BigQueryIO в пакетном режиме), поэтому проблема с данными отсутствует.

У меня есть p.run (). WaitUntilFinish (); в моем коде, так что конвейер работает. И это не дает никакой ошибки.

Версия луча Apache 2.8.0

PCollection<String> stream =
        p.apply("Read File", TextIO
                .read()
                .from(options.getInput())
                .watchForNewFiles(
                        Duration.standardMinutes(1),
                        Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))
                )
                .withCompression(Compression.AUTO));

Это прекрасно работает и читает файлы, как только они становятся доступны. PCollection не ограничен и содержит строки текста из этих файлов.

После некоторых преобразований

PCollection<List<String>> lines = stream.apply("Parse CSV",
        ParDo.of(new ParseCSV())
);

PCollection<TableRow> rows = lines.apply("Convert to BQ",
        ParDo.of(new BigQueryConverter(schema))
);

Шаг ParseCSV добавляет временные метки к своему получателю через outputWithTimestamp.

Я получаю коллекцию таблиц TableRows, готовых для потоковой передачи в BigQuery. Для этого я использую

WriteResult result = rows.apply("WriteToBigQuery",
        BigQueryIO.
                <TableRow>write()
                .withFormatFunction(input -> input)
                .withSchema(bqSchema)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                .withExtendedErrorInfo()
                .to(options.getOutput())

);

Это никогда не записывает данные в BigQuery. Если я загляну в пользовательский интерфейс, я вижу, что BigQueryIO делает

  • ShardTableWrites
  • TagWithUniqueId
  • Перестановки
    • Window.into
    • GroupByKey

Данные входят и выходят из первых двух шагов. Но никогда не перестановки. Это только читает данные, но никогда не передает данные. Шаг внутри Reshuffle, который вызывает это GroupByKey.

Поскольку коллекция не ограничена, я попытался настроить окно с

lines = lines.apply(Window.configure()
        .<List<String>>into(FixedWindows
                .of(Duration.standardSeconds(10))
        )
);

, что должно заставить все, что делает GroupByKey, освободить окно через 10 секунд. Но это не так.

lines = lines.apply(Window.configure()
        .<List<String>>into(FixedWindows
                .of(Duration.standardSeconds(10))
        )
        .triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10)))
        .withAllowedLateness(Duration.standardSeconds(0))
        .discardingFiredPanes()
);

Добавление определенного триггера на время обработки также не помогло. Любая подсказка? Заранее спасибо!

1 Ответ

0 голосов
/ 14 июня 2019

Один из способов - это (который мне помог) назначить новый ключ для каждого элемента и заставить поток данных разъединять преобразования с помощью Reshuffle или GroupByKey.

streams.apply(WithKeys.of(input -> 1)).setCoder(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
       .apply(Reshuffle.of())
       .apply(MapElements.via(new SimpleFunction<KV<Integer, String>, String>() {
           @Override
           public String apply(KV<Integer, String> input) {
               return input.getValue();
           }
       }))
       .apply("convertToTableRow", ...)
       .apply("WriteToBigQuery", ...)

Ключ может быть константойкак в примере или случайном.Если вы выбираете случайное, то вы должны установить диапазон достаточно мал, чтобы поместиться в память JVM.Нравится ThreadLocalRandom.current().nextInt(0, 5000)

...