Нет модуля с именем airfow.gcp - как запустить задание потока данных, использующее python3 / beam 2.15? - PullRequest
7 голосов
/ 24 октября 2019

Когда я перехожу к использованию операторов / хуков, таких как BigQueryHook, я вижу сообщение, что эти операторы устарели и используют версию оператора airflow.gcp .... Однако, когда я пытаюсь использовать его в моем dag, он не работает и говорит, что нет модуля с именем airflow.gcp. У меня есть самая свежая версия airflow composer с функциями бета-версии, python3. Можно ли как-то установить эти операторы?

Я пытаюсь запустить задание потока данных в python 3 с использованием луча 2.15. Я пробовал оператор virtualenv, но он не работает, потому что он позволяет только python2.7. Как я могу это сделать?

Ответы [ 2 ]

4 голосов
/ 30 октября 2019

Самая новая версия Airflow, доступная в Composer, - это 1.10.2 или 1.10.3 (в зависимости от региона). К тому времени эти операторы уже были в разделе contrib.

Сосредоточение на том, как запускать задания потока данных Python 3 с Composer, которые вам понадобятся для выпуска новой версии. Однако, если вам нужно немедленное решение, вы можете попытаться создать бэкпорт для fix .

. В этом случае я определил DataFlow3Hook, который расширяет нормальный DataFlowHook, но это делаетне жесткий код python2 в методе start_python_dataflow:

class DataFlow3Hook(DataFlowHook):
    def start_python_dataflow(
        ...
        py_interpreter: str = "python3"
    ):

        ...

        self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
                             label_formatter)

Тогда у нас будет свой пользовательский DataFlowPython3Operator вызов нового хука:

class DataFlowPython3Operator(DataFlowPythonOperator):

    def execute(self, context):
        ...
        hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
                            delegate_to=self.delegate_to,
                            poll_sleep=self.poll_sleep)
        ...
        hook.start_python_dataflow(
            self.job_name, formatted_options,
            self.py_file, self.py_options, py_interpreter="python3")

Наконец, вВ нашей DAG мы просто используем новый оператор:

task = DataFlowPython3Operator(
    py_file='/home/airflow/gcs/data/main.py',
    task_id=JOB_NAME,
    dag=dag)

Полный код здесь . Работа выполняется с Python 3.6:

enter image description here

Сведения об окружении и используемые зависимости (задание Beam было минимальным примером):

softwareConfig:
  imageVersion: composer-1.8.0-airflow-1.10.3
  pypiPackages:
    apache-beam: ==2.15.0
    google-api-core: ==1.14.3
    google-apitools: ==0.5.28
    google-cloud-core: ==1.0.3
  pythonVersion: '3'

Дайте мне знать, если это работает для вас. Если это так, я бы порекомендовал перенести код в плагин для удобства чтения кода и повторного его использования в группах доступности баз данных.

0 голосов
/ 04 ноября 2019

В качестве альтернативы вы можете использовать PythonVirtualenvOperator на более старых версиях воздушного потока. Учитывая некоторый конвейер луча (заключенный в функцию), сохраненный как dataflow_python3.py:

def main():
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    import argparse
    import logging

    class ETL(beam.DoFn):
        def process(self, row):
            #do data processing


    def run(argv=None):
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--input',
            dest='input',
            default='gs://bucket/input/input.txt',
            help='Input file to process.'
            )
        known_args, pipeline_args = parser.parse_known_args(argv)
        pipeline_args.extend([
              '--runner=DataflowRunner',
              '--project=project_id',
              '--region=region',
              '--staging_location=gs://bucket/staging/',
              '--temp_location=gs://bucket/temp/',
              '--job_name=job_id',
              '--setup_file=./setup.py'
              ])

        pipeline_options = PipelineOptions(pipeline_args)
        pipeline_options.view_as(SetupOptions).save_main_session = True

        with beam.Pipeline(options=pipeline_options) as p:
            rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
            etl = (rows | 'process data' >> beam.ParDo(ETL()))


    logging.getLogger().setLevel(logging.DEBUG)
    run()

Вы можете запустить его, используя следующий файл DAG:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonVirtualenvOperator
import sys

import dataflow_python3 as py3 #import your beam pipeline file here 


default_args = {
    'owner': 'John Smith',
    'depends_on_past': False,
    'start_date': datetime(2016, 1, 1),
    'email': ['email@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=1),
}

CONNECTION_ID = 'proj_id'

with DAG('Dataflow_Python3', schedule_interval='@once', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:


    dataflow_python3 = PythonVirtualenvOperator(
        task_id='dataflow_python3',
        python_callable=py3.main, #this is your beam pipeline callable 
        requirements=['apache-beam[gcp]', 'pandas'],
        python_version=3,
        dag=dag
    )

dataflow_python3
...