Загрузка CSV с нестандартным разделителем в BQ - PullRequest
0 голосов
/ 11 февраля 2019

Предположим, у меня есть следующие данные в CSV-файле:

'"tom","jones","hello,\nMy name is tom"\x01\n"sarah","smith","hello"\x01\n'

Терминатор строки \x01\n.Можно ли загрузить это непосредственно в GCS (без предварительного предварительного форматирования)?Мой мыслительный процесс заключался в следующем:

  • Загрузите его в CSV, используя нестандартный разделитель (например, \x00ff), чтобы получить все данные в одной строке.
  • Затем выполните основныеDML-файлы «очищают» данные и переформатируют их.

Однако при наличии строк запуска мы сталкиваемся с проблемой, поскольку BQ не «поддерживает» (если вы хотите вызватьэто что) порядок строк.Вот как мои данные теперь выглядят в BQ:

enter image description here

Как мы видим, порядок строк не работает, поэтому это было бы невозможнонапример, «рекомбинировать данные» с UDF для получения нужных нам CSV-данных.

Есть ли здесь другие возможные подходы?Просто чтобы уточнить, я хочу преобразовать CSV-файл, который уже находится в GCS через BigQuery, без необходимости загружать этот файл на отдельный сервер для обработки перед загрузкой в ​​BQ.


Для справки,вот код, который я сейчас использую:

# /tmp/schema_external_nonstandard_csv.json
{
  "schema": {
    "fields": [
      {
        "name": "data",
        "type": "STRING"
      }
    ]
  },
  "sourceFormat": "CSV",
  "sourceUris": [
    "gs://XY-bq/ns.csv"
  ],
  "csvOptions": {
    "fieldDelimiter": "\u00ff",
    "quote": ""
  },
  "maxBadRecords": 1000000
}

$ bq mk --external_table_definition=/tmp/schema_external_nonstandard_csv.json datadocs-163219:bqtesting.ns
$ bq query --nouse_legacy_sql 'CREATE TABLE `XY-163219.bqtesting.ns1` AS select * from `XY-163219.bqtesting.ns`'

1 Ответ

0 голосов
/ 18 февраля 2019

Я думал о некоторых чистых решениях BigQuery:

  1. Указание разделителя с помощью bq.Это не работает для этого случая использования, как описано здесь

"Разделитель может быть любым однобайтовым символом ISO-8859-1."

Использование REGEXP_REPLACE лучшее, что я получил, это одна строка с разрывом строки внутри:
CREATE OR REPLACE TABLE test.separator_final AS
SELECT
  REGEXP_REPLACE(data, r"\\x01\\n", "\n") AS data
FROM
  test.separator_external

enter image description here

Следуя предыдущему пункту, можно использовать «хак», чтобы разбить строку на разные строки (см. Ответ здесь ).Тем не менее, предостережение заключается в том, что вам нужно знать количество разбиений априори, и в этом случае оно не является согласованным.

Тот, который вы уже используете, но добавление номера строки, чтобыданные могут быть объединены обратно.Это может сработать, но обеспечение сохранения порядка строк также может быть сложным.

Если мы рассмотрим использование других продуктов GCP в качестве посредника между GCS и BigQuery, мы можем найти другие интересные решения:

Использование Dataprep, который запускает Dataflow под капотом.Существует заменяющее преобразование ( docs ), и шаблоны потока данных можно создавать и вызывать программным способом.

с использованием потока данных.Я на самом деле протестировал это решение с этим gist , и оно работает: я думаю, что это можно очень хорошо расширить, создав шаблон (пользовательский разделитель может быть входным параметром) и вызывая его каждый раз, когда вы загружаете файлв GCS с облачными функциями (решение NoOps).

Вкратце, мы читаем записи из файла, используя TextIO.read().from(file), где file - путь GCS (укажите input и output параметры при запуске задания).Мы можем дополнительно использовать фиктивный разделитель, используя withDelimiter(), чтобы избежать конфликтов (здесь мы снова ограничены одиночными байтами, поэтому мы не можем напрямую передать действительный).Затем для каждой строки мы делим реальный разделитель на c.element().split("\\\\x01\\\\n").Обратите внимание, что нам нужно экранировать уже экранированные символы (вы можете убедиться в том, что в запросе JSON результаты с нормальной загрузкой), и, следовательно, четырехкратная обратная косая черта.

p
    .apply("GetMessages", TextIO.read().from(file))
        .apply("ExtractRows", ParDo.of(new DoFn<String, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
          for (String line : c.element().split("\\\\x01\\\\n")) {
            if (!line.isEmpty()) {
              c.output(line);
            }
          }
        }
    }))

Результаты:

enter image description here

Имейте в виду, что, как указал @hlagos, вы можете столкнуться с проблемами для очень больших однострочных CSV-файлов либо из-за ограничения строк в BigQuery, либо из-за нерасщепляемых шагов, назначенных дляодин работник в потоке данных.

...