Я думал о некоторых чистых решениях BigQuery:
- Указание разделителя с помощью bq.Это не работает для этого случая использования, как описано здесь
"Разделитель может быть любым однобайтовым символом ISO-8859-1."
Использование
REGEXP_REPLACE
лучшее, что я получил, это одна строка с разрывом строки внутри:
CREATE OR REPLACE TABLE test.separator_final AS
SELECT
REGEXP_REPLACE(data, r"\\x01\\n", "\n") AS data
FROM
test.separator_external
Следуя предыдущему пункту, можно использовать «хак», чтобы разбить строку на разные строки (см. Ответ здесь ).Тем не менее, предостережение заключается в том, что вам нужно знать количество разбиений априори, и в этом случае оно не является согласованным.
Тот, который вы уже используете, но добавление номера строки, чтобыданные могут быть объединены обратно.Это может сработать, но обеспечение сохранения порядка строк также может быть сложным.
Если мы рассмотрим использование других продуктов GCP в качестве посредника между GCS и BigQuery, мы можем найти другие интересные решения:
Использование Dataprep, который запускает Dataflow под капотом.Существует заменяющее преобразование ( docs ), и шаблоны потока данных можно создавать и вызывать программным способом.
с использованием потока данных.Я на самом деле протестировал это решение с этим gist , и оно работает: я думаю, что это можно очень хорошо расширить, создав шаблон (пользовательский разделитель может быть входным параметром) и вызывая его каждый раз, когда вы загружаете файлв GCS с облачными функциями (решение NoOps).
Вкратце, мы читаем записи из файла, используя TextIO.read().from(file)
, где file
- путь GCS (укажите input
и output
параметры при запуске задания).Мы можем дополнительно использовать фиктивный разделитель, используя withDelimiter()
, чтобы избежать конфликтов (здесь мы снова ограничены одиночными байтами, поэтому мы не можем напрямую передать действительный).Затем для каждой строки мы делим реальный разделитель на c.element().split("\\\\x01\\\\n")
.Обратите внимание, что нам нужно экранировать уже экранированные символы (вы можете убедиться в том, что в запросе JSON результаты с нормальной загрузкой), и, следовательно, четырехкратная обратная косая черта.
p
.apply("GetMessages", TextIO.read().from(file))
.apply("ExtractRows", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (String line : c.element().split("\\\\x01\\\\n")) {
if (!line.isEmpty()) {
c.output(line);
}
}
}
}))
Результаты:
Имейте в виду, что, как указал @hlagos, вы можете столкнуться с проблемами для очень больших однострочных CSV-файлов либо из-за ограничения строк в BigQuery, либо из-за нерасщепляемых шагов, назначенных дляодин работник в потоке данных.