как эффективно создавать определения баз данных на основе базы данных - PullRequest
1 голос
/ 02 мая 2019

Фон

У меня есть некоторые даг, которые извлекают данные из стороннего API.

Счета, которые нам нужно получить, могут со временем меняться.Чтобы определить, какие учетные записи использовать, в зависимости от процесса нам может потребоваться запросить базу данных или сделать HTTP-запрос.

Перед потоком мы просто получили бы список учетных записей в начале скрипта Python.Затем мы перебирали список учетных записей и перетаскивали каждую учетную запись в файл или все, что нам нужно было сделать.

Но теперь, используя воздушный поток, имеет смысл определять задачи на уровне учетной записи и позволять воздушному потоку обрабатывать функциональность повторных попыток, диапазон дат и параллельное выполнение и т. Д.

Таким образом, мой шаблон может выглядеть примерно так:

Tree view of dag.  Each account gets its one task for data pull.

Задача

Поскольку каждая учетная запись является задачей, доступ к списку учетных записей должен осуществляться с помощьюкаждый разборНо поскольку файлы dag анализируются часто, вам не обязательно запрашивать базу данных или ждать вызова REST при каждом анализе dag с каждого компьютера в течение всего дня.Это может быть ресурсоемким и стоить денег.

Вопрос

Есть ли хороший способ кэшировать этот тип конфигурационной информации в локальном файле, в идеале суказанное время жизни?

Мысли

Я думал о паре разных подходов:

  1. запись в файл CSV или Pickleи используйте mtime, чтобы истечь.
    • проблема в том, что я могу получить коллизии, если два процесса попытаются истечь файл одновременно.я не знаю, насколько вероятно это или каковы будут последствия, но, вероятно, ничего страшного.
  2. создать общую базу данных sqlite для всех таких процессов.должен быть создан автоматически при первом обращении к переменной.каждая переменная конфигурации получает строку в таблице.используйте столбец last_modified_datetime, чтобы указать, когда истекает срок действия.
    • требует более сложного кода и зависимостей.
  3. использование переменных воздушного потока
    • приятно то, что он использует существующую БД, поэтому не будет $ за запрос и разумное сетевое отставание, но для него по-прежнему требуется сетьпоездка туда и обратно.
    • имеет преимущество в том, что он идентичен для всех узлов в многоузловой установке.
    • определение того, когда истечет срок действия, вероятно, будет проблематичным, поэтому, вероятно, создаст конфигурационный файл dag для периодического обновления переменных конфигурации.
    • но тогда это усложнит процесс развертывания и разработки - переменные должны быть заполнены, чтобы правильно определить группы обеспечения доступности баз данных - все разработчики должны будут также управлять этим локально, а не создаватьметод кэширования при чтении.
  4. Subdags?
    • никогда не использовал их, но у меня есть подозрение, что они могут быть использованы здесь.Но сообщество, похоже, все равно не рекомендует их использовать ...

Вы справились с этой проблемой?Вы пришли к хорошему решению?Ничто из этого не кажется очень хорошим.

Ответы [ 2 ]

1 голос
/ 03 мая 2019

У меня точно такой же сценарий.

Есть вызов API для нескольких учетных записей.Изначально создал скрипт Python для итерации списка.

Когда я начал использовать Airflow, подумал о том, что вы планируете делать.Перепробовал 2 из перечисленных вами альтернатив.После некоторых экспериментов решил обработать логику повторения в Python с помощью простых блоков try-кроме, если HTTP-вызовы не удаются.Причины:

  1. Один сценарий для поддержки
  2. Меньше объектов воздушного потока
  3. Перезапускаемость легче с одним сценарием на месте.(перезапуск неудавшейся работы в Airflow - не легкая задача (не каламбур))

В конце концов, решать вам, это был мой опыт.

1 голос
/ 03 мая 2019

Интервал синтаксического анализа DAG по умолчанию довольно прост: 5 минут.Но даже это довольно много для большинства людей, поэтому вполне разумно увеличить это, если ваше развертывание не слишком близко к срокам для новых DAG.

В общем, я бы сказал, что это не тактак плохо делать REST-запрос при каждом биении разбора DAG.Кроме того, в настоящее время процесс планирования отделен от процесса синтаксического анализа, что не влияет на скорость планирования ваших задач.Airflow кеширует определение DAG для вас.

Если вы считаете, что у вас все еще есть причины поставить свой собственный кеш поверх этого, я предлагаю кэшировать на сервере определений, а не на стороне Airflow.Например, используя заголовки кеша на конечной точке REST и самостоятельно обрабатывая аннулирование кеша, когда вам это нужно.Но это может быть некоторой преждевременной оптимизацией, поэтому я советую начинать без нее и реализовывать ее, только если вы измеряете убедительные доказательства того, что она вам нужна.

РЕДАКТИРОВАТЬ : относительно веб-сервера и рабочего

Это правда, что веб-сервер также будет запускать анализ DAG, не зная, как часто.Вероятно, после интервала обновления рабочих Guicorn (который по умолчанию составляет 30 секунд).Рабочие будут делать это также по умолчанию в начале каждой задачи, но это можно сохранить, если активировать травление DAG.Хотя я не уверен, что это хорошая идея, я слышал, что это что-то, что не рекомендуется использовать.

Еще одна вещь, которую вы можете попытаться сделать, это кэшировать это в самом процессе Airflow, запоминая функцию, которая делаетдорогой запрос.Для этого в Python есть встроенные функции functools (lru_cache), и вместе с травлением это может быть достаточно и намного проще, чем другие варианты.

...