Успешный конвейер потока данных, запускаемый несколько раз через PythonVirtualenvOperator в Airflow - PullRequest
0 голосов
/ 05 ноября 2019

Я использую конвейер Apache Beam (развернутый с Google Dataflow), который организуется с помощью Apache Airflow.

Файл DAG выглядит следующим образом:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonVirtualenvOperator

import custom_py_file #beam job in this file 


default_args = {
    'owner': 'name',
    'depends_on_past': False,
    'start_date': datetime(2016, 1, 1),
    'email': ['email@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=1),
}

CONNECTION_ID = 'proj'

with DAG('dag_pipeline', schedule_interval='@once', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:


    lines = PythonVirtualenvOperator(
        task_id='lines',
        python_callable=custom_py_file.main, #this file has a function main() where the beam job is declared 
        requirements=['apache-beam[gcp]', 'pandas'],
        python_version=3,
        dag=dag
    )

lines

Файл конвейерного луча (custom_py_file.py) выглядит следующим образом:

def main():
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    import argparse
    import time


    class ETL(beam.DoFn):
        def process(self, row):
            #process data 

    def run(argv=None):
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--input',
            dest='input',
            default='gs://bucket/input/input.txt',
            help='Input file to process.'
            )
        known_args, pipeline_args = parser.parse_known_args(argv)
        pipeline_args.extend([
              '--runner=DataflowRunner',
              '--project=proj',
              '--region=region',
              '--staging_location=gs://bucket/staging/',
              '--temp_location=gs://bucket/temp/',
              '--job_name=name-{}'.format(time.strftime("%Y%m%d%h%M%s").lower()),
              '--setup_file=/home/airflow/gcs/dags/setup.py',
              '--disk_size_gb=350',
              '--machine_type=n1-highmem-96',
              '--num_workers=24',
              '--autoscaling_algorithm=NONE'
              ]) 

        pipeline_options = PipelineOptions(pipeline_args)
        pipeline_options.view_as(SetupOptions).save_main_session = True

        with beam.Pipeline(options=pipeline_options) as p:
            rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
            etl = (rows | 'process data' >> beam.ParDo(ETL()))

        p.run().wait_until_finish()

    logging.getLogger().setLevel(logging.DEBUG)
    run()

Я использую PythonVirtualenvOperator, потому что я не могу использовать Python3 и BashOperator с моей текущей версией воздушного потока (Версия: 1.10.2-composer), и мне нужен Python3 для запуска этого конвейера.

Проблема заключается в том, что, несмотря на успешное выполнение, Airflow отправляет еще одно задание потока данных. Обратите внимание, что это НЕ повторная попытка, поскольку журналы показывают, что это все «одна» задача. Однако журналы потока данных показывают, что он выполняет ту же самую работу снова после того, как он уже успешно выполнен один раз.

dataflow logs

Что здесь происходит? Успешное задание потока данных не выводит значение 0? Как заставить его перейти к следующему заданию, если оно выполнено правильно? Спасибо!

1 Ответ

1 голос
/ 05 ноября 2019

Тот факт, что это не считается повторной попыткой и выполнение одного задания после завершения первого, заставило меня заподозрить нечто подобное this . Проверяя ваш код Python, я вижу, что вы вызываете и with beam.Pipeline(), и p.run():

with beam.Pipeline(options=pipeline_options) as p:
    rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
    etl = (rows | 'process data' >> beam.ParDo(ETL()))

p.run().wait_until_finish()

Это вызовет два последовательных выполнения. Вы можете сделать любой вариант (но не оба):

with beam.Pipeline(options=pipeline_options) as p:
    rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
    etl = (rows | 'process data' >> beam.ParDo(ETL()))

p = beam.Pipeline(options=pipeline_options)

rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))

p.run().wait_until_finish()
...