Я использовал оператор Postgres в S3 для загрузки данных из Postgres в S3. Но недавно мне пришлось экспортировать очень большую таблицу, и мой Airflow composer не работает без каких-либо журналов, это может быть связано с тем, что мы используем функцию NamedTeilitaryFile модуля tempfile Python для создания временного файла, и мы используем это временный файл для загрузки на S3. Поскольку мы используем Composer, он будет загружен в локальную память Composer, и, поскольку размер файла очень велик, происходит сбой.
См. Здесь: https://cloud.google.com/composer/docs/how-to/using/troubleshooting-dags#task_fails_without_emitting_logs
Я проверил оператор RedshiftToS3, поскольку он также использовал хук Postgres, и у него было несколько опций выгрузки, которые могли легко загружаться большие файлы, но я понял, что между Redshift и Postgres нет соответствия 1-1. Так что это невозможно. Есть ли способ разделить мой запрос Postgres? Прямо сейчас я делаю SELECT * FROM TABLENAME
Кроме того, у меня нет никакой информации о таблице.
Я также сталкивался с подобным оператором: https://airflow.apache.org/docs/stable/_modules/airflow/contrib/operators/sql_to_gcs.html
Здесь есть параметр approx_max_file_size_bytes
:
Этот оператор поддерживает возможность разбивать дампы больших таблиц на несколько файлов (см. примечания в файле param docs выше). Этот параметр позволяет разработчикам указывать размер файла разделений.
Из кода я понял, что они создают новый временный файл, когда размер превышает заданный предел, поэтому они разделяют файл в несколько временных файлов, а затем загружать отдельно?
РЕДАКТИРОВАТЬ: Я снова объясню, что именно я пытаюсь сделать. В настоящее время оператор Postgres в S3 создает временный файл и записывает все результаты, возвращаемые курсором, в этот файл, что вызывает проблемы с памятью. Поэтому я могу добавить ограничение max_file_size, и для каждой строки в курсоре я буду записывать результаты в наш временный файл, и если размер нашего временного файла превышает установленный нами предел max_file_size, мы записываем содержимое нашего файл на S3, затем flu sh или удалите этот файл, а затем создайте новый временный файл и запишите следующую строку курсора в этот файл и загрузите этот файл также на S3. Я не уверен, как так изменить оператор?