Задание потока данных, чтобы положить CSV в BigQuery - PullRequest
0 голосов
/ 22 сентября 2018

У меня простое задание потока данных, чтобы записать CSV-файл в bigquery.Я использую BigQUeryIO.write с одним parDo для создания tablerow.Задание выполнялось в течение 40 минут, а затем в итоге завершилось неудачно с сообщением об ошибке.

Размер CSV-файла составляет 30 ГБ с примерно 25 00 000 000 записей.

Работапункт был предпринят 4 раза безуспешно.Каждый раз, когда работник со временем терял связь со службой.

Есть ли рекомендуемый подход для загрузки огромного CSV-файла в bigquery с использованием потока данных.Вниз по линии я должен сделать некоторые пользовательские преобразования данных, поэтому с помощью потока данных.В настоящее время просто проверяется время загрузки.

@ProcessElement
    public void processElement(ProcessContext c) {

        TableRow tableRow = new TableRow();
        String[] parts = c.element().split("(?<!\\\\),");
        try{
        for (int i = 0; i < parts.length; i++) {
            tableRow.set(CreateTableRow.sourceField.get(i), parts[i].replaceAll("\\\\,", ","));

        }
        }catch(RuntimeException e){
            e.printStackTrace();
        }
        c.output(tableRow);
    }

Также появляется предупреждение: Автоматическое масштабирование: невозможно достичь цели изменения размера в зоне us-central1-a.QUOTA_EXCEEDED: превышена квота CPUS.Лимит: 8.0 в регионе us-central1.

Но я не решаю регион, в котором должно выполняться задание.

Создается 7 вычислительных движков, которые я вижу, но все они имеют базовый размер n1-standard-1 (1 vCPU,3,75 ГБ памяти).Но могу ли я это контролировать?

Идентификатор задания потока данных: 2018-09-22_04_40_15-17093046238840040545

Статистика при сбое задания: Работники
8 -> 0 Текущее состояние
Автоматическое масштабирование: Увеличено количество рабочих до 593 в зависимости от скорости выполнения текущих шагов.Метрики ресурса Текущие vCPU 8 Общее время vCPU 3.464 vCPU час Текущая память 30 ГБ Общее время памяти 12,99 ГБ час Текущий PD 1,95 ТБ Общее время PD 865,974 ГБ час Текущий SSD PD 0 B Общее время SSD PD 0 ГБ час

Код записи:

dataValue.apply(ParDo.of(new CreateTableRow()))
    .apply(BigQueryIO.writeTableRows().to(tableSpec)
            .withSchema(CreateTableRow.getSchema())
            .withCustomGcsTempLocation(valueProvider)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

Код чтения:

TextIO.read().from("gs://test-n/dummyData.csv")

Есть ли способ улучшить производительность при меньшем количестве работников.Чтение части csv или создание части строки таблицы.Можно ли оптимизировать его для лучшей работы с меньшим количеством работников?

Изменил регион на северо-восток Азии, так как там была высокая квота, и изменил тип машины до n1-standard-8.И работа была закончена очень быстро.Полагаю, это произошло из-за ошибки типа внутреннего пространства кучи машины.

В любом случае, теперь она работает.

...