У меня простое задание потока данных, чтобы записать CSV-файл в bigquery.Я использую BigQUeryIO.write с одним parDo для создания tablerow.Задание выполнялось в течение 40 минут, а затем в итоге завершилось неудачно с сообщением об ошибке.
Размер CSV-файла составляет 30 ГБ с примерно 25 00 000 000 записей.
Работапункт был предпринят 4 раза безуспешно.Каждый раз, когда работник со временем терял связь со службой.
Есть ли рекомендуемый подход для загрузки огромного CSV-файла в bigquery с использованием потока данных.Вниз по линии я должен сделать некоторые пользовательские преобразования данных, поэтому с помощью потока данных.В настоящее время просто проверяется время загрузки.
@ProcessElement
public void processElement(ProcessContext c) {
TableRow tableRow = new TableRow();
String[] parts = c.element().split("(?<!\\\\),");
try{
for (int i = 0; i < parts.length; i++) {
tableRow.set(CreateTableRow.sourceField.get(i), parts[i].replaceAll("\\\\,", ","));
}
}catch(RuntimeException e){
e.printStackTrace();
}
c.output(tableRow);
}
Также появляется предупреждение: Автоматическое масштабирование: невозможно достичь цели изменения размера в зоне us-central1-a.QUOTA_EXCEEDED: превышена квота CPUS.Лимит: 8.0 в регионе us-central1.
Но я не решаю регион, в котором должно выполняться задание.
Создается 7 вычислительных движков, которые я вижу, но все они имеют базовый размер n1-standard-1 (1 vCPU,3,75 ГБ памяти).Но могу ли я это контролировать?
Идентификатор задания потока данных: 2018-09-22_04_40_15-17093046238840040545
Статистика при сбое задания: Работники
8 -> 0 Текущее состояние
Автоматическое масштабирование: Увеличено количество рабочих до 593 в зависимости от скорости выполнения текущих шагов.Метрики ресурса Текущие vCPU 8 Общее время vCPU 3.464 vCPU час Текущая память 30 ГБ Общее время памяти 12,99 ГБ час Текущий PD 1,95 ТБ Общее время PD 865,974 ГБ час Текущий SSD PD 0 B Общее время SSD PD 0 ГБ час
Код записи:
dataValue.apply(ParDo.of(new CreateTableRow()))
.apply(BigQueryIO.writeTableRows().to(tableSpec)
.withSchema(CreateTableRow.getSchema())
.withCustomGcsTempLocation(valueProvider)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Код чтения:
TextIO.read().from("gs://test-n/dummyData.csv")
Есть ли способ улучшить производительность при меньшем количестве работников.Чтение части csv или создание части строки таблицы.Можно ли оптимизировать его для лучшей работы с меньшим количеством работников?
Изменил регион на северо-восток Азии, так как там была высокая квота, и изменил тип машины до n1-standard-8.И работа была закончена очень быстро.Полагаю, это произошло из-за ошибки типа внутреннего пространства кучи машины.
В любом случае, теперь она работает.