Мой шаблон потока данных не запускает шаги перед BigQuery - PullRequest
1 голос
/ 05 мая 2019

У меня есть код, созданный с помощью Apache-Beam, который читает файл CSV и вставляет его в BigQuery. В конвейере я выполняю три шага (применить): 1. прочитать csv, 2. преобразовать текст в TableRow и 3. вставить в BigQuery (BigQueryIO.writeTableRow())

Я создаю шаблон, но когда я собираюсь его выполнить (с помощью dataflow-runner), он выполняет только шаг BigQuery. Не начинайте шаги чтения csv или преобразования в TableRow.

Что может происходить?

- Это неудачное выполнение -

Я попытался прокомментировать блок BigQuery (применить), и там он выполняет предыдущие шаги. Я также попытался сгенерировать конвейер параллельно, а также запускает его. Проблема возникает, когда я связываю конвейер с шагом (применить) BigQuery.

public static void main(String[] args) throws Throwable {

     String sourceFilePath = "gs://dgomez_test/input.csv";
     String tempLocationPath = "gs://dgomez_test/tmp";

     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
     options.setTempLocation(tempLocationPath);
     options.setJobName("csvtobq");
     Pipeline p = Pipeline.create(options);

     p.apply("read csv", TextIO.read().from(sourceFilePath))
      .apply("string to tablerow", ParDo.of(new FormatForBigquery()));
      .apply("write to bigquery",
           BigQueryIO.writeTableRows().to(TABLE)
               .withSchema(FormatForBigquery.getSchema())
               .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
               .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

     p.run();
}
...