У меня есть код, созданный с помощью Apache-Beam, который читает файл CSV и вставляет его в BigQuery. В конвейере я выполняю три шага (применить): 1. прочитать csv, 2. преобразовать текст в TableRow и 3. вставить в BigQuery (BigQueryIO.writeTableRow()
)
Я создаю шаблон, но когда я собираюсь его выполнить (с помощью dataflow-runner), он выполняет только шаг BigQuery. Не начинайте шаги чтения csv или преобразования в TableRow.
Что может происходить?
- Это неудачное выполнение -
Я попытался прокомментировать блок BigQuery (применить), и там он выполняет предыдущие шаги. Я также попытался сгенерировать конвейер параллельно, а также запускает его. Проблема возникает, когда я связываю конвейер с шагом (применить) BigQuery.
public static void main(String[] args) throws Throwable {
String sourceFilePath = "gs://dgomez_test/input.csv";
String tempLocationPath = "gs://dgomez_test/tmp";
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
options.setTempLocation(tempLocationPath);
options.setJobName("csvtobq");
Pipeline p = Pipeline.create(options);
p.apply("read csv", TextIO.read().from(sourceFilePath))
.apply("string to tablerow", ParDo.of(new FormatForBigquery()));
.apply("write to bigquery",
BigQueryIO.writeTableRows().to(TABLE)
.withSchema(FormatForBigquery.getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
p.run();
}