Я использую облачный композитор для организации ETL для файлов, поступающих в GCS и отправляемых в BigQuery. У меня есть облачная функция, которая запускает dag при поступлении файла, а облачная функция передает имя файла / местоположение в DAG. В моем DAG у меня есть 2 задачи:
1) Используйте DataflowPythonOperator
для запуска задания потока данных, которое считывает данные из текста в GCS и преобразует их и вводит их в BQ, а 2) перемещает файл в корзину неудач / успехов, в зависимости от того, не удалось ли выполнить задание.
У каждого файла есть идентификатор файла, который является столбцом в таблице больших запросов. Иногда файл будет редактироваться один или два раза (это не потоковое вещание, где это часто происходит), и я хочу иметь возможность сначала удалить существующие записи для этого файла.
Я посмотрел на других операторов воздушного потока, но хотел, чтобы в моей группе DAG было 2 задачи, прежде чем запускать задание потока данных:
- Получить идентификатор файла на основе имени файла (сейчас у меня есть имя файла сопоставления таблицы BigQuery -> идентификатор файла, но я также могу просто ввести JSON, который служит картой, я думаю, если это проще)
- Если идентификатор файла уже присутствует в таблице больших запросов (таблице, которая выводит преобразованные данные из задания потока данных), удалите ее, а затем запустите задание потока данных , чтобы у меня была самая свежая информация. Я знаю, что один из вариантов - просто добавить отметку времени и использовать только самые последние записи, но поскольку на файл может приходиться 1 миллион записей, а это не то, что я удаляю 100 файлов в день (может быть, 1-2 вершины) кажется, что это может быть грязно и запутанно.
После задания потока данных, в идеале, перед перемещением файла в папку успеха / неудачи, я хотел бы добавить к некоторой таблице «записей», говорящей, что эта игра была введена в это время. Это будет мой способ увидеть все вставки, которые произошли.
Я пытался найти разные способы сделать это, я новичок в облачном компоновщике, поэтому у меня нет четкого представления о том, как это будет работать после 10+ часов исследований, иначе я бы опубликовал код для ввода.
Спасибо, я очень благодарен за помощь всем и приношу извинения, если это не так ясно, как хотелось бы, документация по воздушному потоку очень надежна, но, учитывая облачный композитор и большие запросы, относительно новы, трудно так же тщательно изучить, как выполнить некоторые специфические для GCP задачи.