Как экспортировать CSV-файл в таблицу BigQery, используя поток данных Java? - PullRequest
0 голосов
/ 12 июня 2019

Я хочу прочитать CSV-файл из облачного хранилища и записать его в таблицу больших запросов со столбцами, используя поток данных в Java. Как я могу установить заголовки для файла csv при записи в bigquery?

Ответы [ 2 ]

0 голосов
/ 14 июня 2019

Я выполнил аналогичную задачу и использовал общую библиотеку Apache в функции ParDo для извлечения данных из файлов CSV, а затем преобразовал их в объекты таблиц строк для BQ.

String fileData = c.element();
BufferedReader fileReader = new BufferedReader(new InputStreamReader(
      new ByteArrayInputStream(fileData.getBytes("UTF-8")), "UTF-8"));
CSVParser csvParser = new CSVParser(fileReader,CSVFormat.DEFAULT.withFirstRecordAsHeader().withIgnoreHeaderCase().withTrim());
Iterable<CSVRecord> csvRecords = csvParser.getRecords(); 

for (CSVRecord csvRecord : csvRecords) {
    TableRow row = new TableRow();
    checkAndConvertIntoBqDataType(csvRecord.toMap());
    c.output(row);
}
0 голосов
/ 14 июня 2019

Здесь нужно решить две проблемы

  1. Пропуск заголовка при чтении данных и
  2. Использование заголовка для правильного заполнения столбцов таблицы больших запросов.

Для (1) это, по состоянию на июнь 2019 года, изначально не реализовано , хотя вы можете попробовать опции, перечисленные в Пропуск строк заголовка - возможно ли это с Cloud DataFlow? .Для (2) проще всего было бы прочитать первую строку вашего CSV в главной программе и передать список имен столбцов в конструкторе в DoFn, который преобразует строки CSV в объекты TableRow, готовые для записи в Bigquery.

Ваша окончательная программа будет выглядеть примерно так:

public void CsvToBigquery(csvInputPattern, bigqueryTable) {
  final String[] columns = readAndSplitFirstLineOfFirstFile(csvInputPattern);
  Pipeline p = new Pipeline.create(...);
  p
    .apply(TextIO.read().from(csvInputPattern)
    .apply(Filter.by(new MatchIfNonHeader())
    .apply(ParDo.of(new DoFn<String, TableRow>() {
             ... // use columns here to TableRows
           })
    .apply(BigtableIO.write().withTableId(bigqueryTable)...);
}
...