Airflow DAG - как сначала проверить BQ (при необходимости удалить), а затем запустить задание потока данных? - PullRequest
0 голосов
/ 09 января 2019

Я использую облачный композитор для организации ETL для файлов, поступающих в GCS и отправляемых в BigQuery. У меня есть облачная функция, которая запускает dag при поступлении файла, а облачная функция передает имя файла / местоположение в DAG. В моем DAG у меня есть 2 задачи:

1) Используйте DataflowPythonOperator для запуска задания потока данных, которое считывает данные из текста в GCS и преобразует их и вводит их в BQ, а 2) перемещает файл в корзину неудач / успехов, в зависимости от того, не удалось ли выполнить задание. У каждого файла есть идентификатор файла, который является столбцом в таблице больших запросов. Иногда файл будет редактироваться один или два раза (это не потоковое вещание, где это часто происходит), и я хочу иметь возможность сначала удалить существующие записи для этого файла.

Я посмотрел на других операторов воздушного потока, но хотел, чтобы в моей группе DAG было 2 задачи, прежде чем запускать задание потока данных:

  1. Получить идентификатор файла на основе имени файла (сейчас у меня есть имя файла сопоставления таблицы BigQuery -> идентификатор файла, но я также могу просто ввести JSON, который служит картой, я думаю, если это проще)
  2. Если идентификатор файла уже присутствует в таблице больших запросов (таблице, которая выводит преобразованные данные из задания потока данных), удалите ее, а затем запустите задание потока данных , чтобы у меня была самая свежая информация. Я знаю, что один из вариантов - просто добавить отметку времени и использовать только самые последние записи, но поскольку на файл может приходиться 1 миллион записей, а это не то, что я удаляю 100 файлов в день (может быть, 1-2 вершины) кажется, что это может быть грязно и запутанно.

После задания потока данных, в идеале, перед перемещением файла в папку успеха / неудачи, я хотел бы добавить к некоторой таблице «записей», говорящей, что эта игра была введена в это время. Это будет мой способ увидеть все вставки, которые произошли. Я пытался найти разные способы сделать это, я новичок в облачном компоновщике, поэтому у меня нет четкого представления о том, как это будет работать после 10+ часов исследований, иначе я бы опубликовал код для ввода.

Спасибо, я очень благодарен за помощь всем и приношу извинения, если это не так ясно, как хотелось бы, документация по воздушному потоку очень надежна, но, учитывая облачный композитор и большие запросы, относительно новы, трудно так же тщательно изучить, как выполнить некоторые специфические для GCP задачи.

1 Ответ

0 голосов
/ 14 января 2019

Звучит немного сложно. К счастью, есть операторы практически для каждого сервиса GCP. Другое дело, когда нужно запускать DAG. Вы поняли это? Вы хотите, чтобы облачная функция Google запускалась каждый раз, когда в этот сегмент GCS поступает новый файл.

  1. Запуск вашего DAG

Чтобы вызвать DAG, вам нужно вызвать его с помощью облачной функции Google, которая использует Object Finalize или Обновление метаданных запускает.

  1. Загрузка данных в BigQuery

Если ваш файл уже в GCS и в формате JSON или CSV, использование задания потока данных является излишним. Вы можете использовать GoogleCloudStorageToBigQueryOperator для загрузки файла в BQ.

  1. Отслеживание идентификатора файла

Вероятно, лучшая вещь для вычисления идентификатора файла - это использование оператора Bash или Python из Airflow. Вы можете получить его непосредственно из имени файла?

Если это так, то вы можете иметь оператор Python, который находится перед GoogleCloudStorageObjectSensor , чтобы проверить, находится ли файл в успешном каталоге.

Если это так, вы можете использовать BigQueryOperator для запуска запроса на удаление в BQ.

После этого вы запускаете GoogleCloudStorageToBigQueryOperator.

  1. Перемещение файлов вокруг

Если вы перемещаете файлы из GCS в местоположения GCS, то GoogleCloudStorageToGoogleCloudStorageOperator должен выполнить нужный вам прием. Если ваш оператор загрузки BQ завершился неудачно, перейдите в папку с ошибочными файлами, а в случае успеха - в папку с успешными заданиями.

  1. Регистрация журналов задач

Возможно, все, что вам нужно для отслеживания вставок - это запись информации о задачах в GCS. Ознакомьтесь с как записать информацию о задачах в GCS

Это помогает?

...