Воздушный поток Python 3.6 с оператором, который требует 2,7 - PullRequest
0 голосов
/ 06 июля 2018

В настоящее время я использую экземпляр airflow (1.9.0) на python 3.6.5. У меня есть ручной рабочий процесс, который я хотел бы перенести в группу DAG. Этот ручной рабочий процесс теперь требует кода, написанного на python 2 и 3. Давайте упростим мою DAG до 3 шагов:

  1. Задание потока данных, которое обрабатывает данные и устанавливает данные для машины Учебная подготовка
  2. Тренировочная работа Tensorflow ML
  3. Другое PythonOperators, которые я написал, используя код Python 3

Задание потока данных написано на python 2.7 (требуется google), а код модели тензорного потока - на python 3. Если посмотреть на «MLEngineTrainingOperator» в airflow 1.9.0, есть параметр python_version, который устанавливает «Используемую версию Python». в обучении ".

Вопросы:

  • Могу ли я динамически указывать конкретную версию Python в рабочем окружающая среда
  • Должен ли я просто установить поток воздуха на python 2.7, чтобы запустить шаг 1)?
  • Могу ли я иметь код модели тензорного потока в Python 3, который просто упаковывается и отправляется через MlEngineTraining, работающий на Python 2?
  • Нужно ли переписывать мои 3) операторы в python 2?

Ответы [ 2 ]

0 голосов
/ 11 июля 2018

Хорошо, из коробки нельзя запустить работника потока воздуха Python 2 на общем кластере потока воздуха Python 3:

Airflow использует SQLAlchemy (я полагаю, что для чтения и записи метаданных о группах обеспечения доступности баз данных в базу данных). Когда вы запускаете группу обеспечения доступности баз данных на работнике, она считывает из базы данных маринованную информацию об этой группе доступности базы данных. Если ваши другие нерабочие компоненты находятся в Python 3, они будут записывать в БД в Pickle 4, в то время как рабочий будет пытаться читать из БД в Python 2.

В частности, посмотрите в SQLAlchemy посмотрите на sqltypes.py:

class PickleType(TypeDecorator):
    """Holds Python objects, which are serialized using pickle.

    PickleType builds upon the Binary type to apply Python's
    ``pickle.dumps()`` to incoming objects, and ``pickle.loads()`` on
    the way out, allowing any pickleable Python object to be stored as
    a serialized binary field.

    To allow ORM change events to propagate for elements associated
    with :class:`.PickleType`, see :ref:`mutable_toplevel`.

    """

    impl = LargeBinary

    def __init__(self, protocol=pickle.HIGHEST_PROTOCOL,
                 pickler=None, comparator=None): 

и затем в compat.py, который в конечном итоге делает травление в sqltypes.py.

py36 = sys.version_info >= (3, 6)
py33 = sys.version_info >= (3, 3)
py35 = sys.version_info >= (3, 5)
py32 = sys.version_info >= (3, 2)
py3k = sys.version_info >= (3, 0)
py2k = sys.version_info < (3, 0)
py265 = sys.version_info >= (2, 6, 5)
jython = sys.platform.startswith('java')
pypy = hasattr(sys, 'pypy_version_info')
win32 = sys.platform.startswith('win')
cpython = not pypy and not jython  # TODO: something better for this ?

import collections
next = next

if py3k:
    import pickle
else:
    try:
        import cPickle as pickle
    except ImportError:
        import pickle

Также donot_pickle = True в потоке воздуха, похоже, не влияет на это ??? Может быть, потому что согласно здесь это относится только к засыпкам?

0 голосов
/ 06 июля 2018

Не существует способа динамически указать версию Python на рабочем месте. Однако если вы используете Celery executor, вы можете запускать несколько рабочих на разных серверах / vms или в разных виртуальных средах.

Вы можете иметь одного работника, работающего на Python 3, и одного, работающего на 2.7, и каждый из них будет прослушивать разные очереди. Это можно сделать тремя разными способами:

  • При запуске работника вы можете добавить -q [queue-name] флаг
  • установить env на AIRFLOW__CELERY__DEFAULT_QUEUE
  • обновление default_queue в [celery] в файле airflow.cfg.

Затем в определениях ваших задач укажите параметр queue, изменяющий очередь в зависимости от того, какую версию Python должна выполнять задача.

Я не знаком с MLEngineOperator, но вы можете указать python_version в PythonOperator, который должен запускать его в virtualenv этой версии. В качестве альтернативы вы можете использовать BashOperator, написать код для запуска в другом файле и указать команду python для его запуска, используя абсолютный путь к той версии python, которую вы хотите использовать.

Независимо от того, как выполняется задача, вам просто нужно убедиться, что сам DAG совместим с версией Python, в которой вы его используете. то есть. если вы собираетесь запустить работника воздушного потока в разных версиях Python, сам файл DAG должен быть совместим с Python 2 и 3. Группа обеспечения доступности баз данных может иметь дополнительные файловые зависимости, которые она использует, и несовместимости версий.

...