Поток данных - Windowed пишет в BigQuery? - PullRequest
0 голосов
/ 29 сентября 2018

Поток данных - есть ли оконные записи в BigQuery?Я пытаюсь запустить задание Dataflow, которое читает 500 миллионов строк файлов, а затем пишет в BigQuery.Когда я бежал, он не превышал 15 миллионов, поэтому я хотел бы посмотреть, поможет ли какой-либо тип оконной записи в BigQuery.Во время работы я получил много ошибок GC Allocation, но я вижу, что это нормально.Я оставил стандартный размер диска, настроенный при запуске.Пожалуйста помоги.Если есть какие-либо примеры для оконной записи в BigQuery, укажите.

Что касается преобразования, то это просто разделение строки, а затем вставка в BigQuery.

Кроме того, приведенный ниже пример продолжает запись в BigQuery, поскольку он продолжает потоковую передачу из PubSub?https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubSubToBigQuery.java

Мой образец ниже

Pipeline pipeline = Pipeline.create(options);
        PCollection<String> textData = pipeline.apply("Read Text Data",
                TextIO.read().from(options.getInputFilePattern()));
        PCollection<TableRow> tr = textData.apply(ParDo.of(new FormatRemindersFn()));

        tr.apply(BigQueryIO.writeTableRows().withoutValidation()              .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
                .withSchema(FormatRemindersFn.getSchema())
                //  .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .to(options.getSchemaDetails()));

 static class FormatRemindersFn extends DoFn<String, TableRow> {
  @ProcessElement
        public void processElement(ProcessContext c) {
            try {
                if (StringUtils.isNotEmpty(c.element())) {
                    String[] fields = c.element().split("\\^",15);

                  //  logger.info("Fields :{}", fields[2]);
                    TableRow row = new TableRow().set("MODIFIED_DATE", fields[0])
                            .set("NAME", fields[1])
                            .set("ADDRESS", fields[2]);

                    c.output(row);
                }
            } catch (Exception e) {
                logger.error("Error: {}", e.getMessage());
            }
        }
}

1 Ответ

0 голосов
/ 03 октября 2018

Ошибка была исправлена ​​после комментирования регистрации, выполненной как часть DoFn для каждого элемента.Ведение журнала для каждого элемента не должно выполняться при обработке такого количества элементов.

...