Я использую конвейер Apache Beam (развернутый с Google Dataflow), который организуется с помощью Apache Airflow.
Файл DAG выглядит следующим образом:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonVirtualenvOperator
import custom_py_file #beam job in this file
default_args = {
'owner': 'name',
'depends_on_past': False,
'start_date': datetime(2016, 1, 1),
'email': ['email@gmail.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=1),
}
CONNECTION_ID = 'proj'
with DAG('dag_pipeline', schedule_interval='@once', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:
lines = PythonVirtualenvOperator(
task_id='lines',
python_callable=custom_py_file.main, #this file has a function main() where the beam job is declared
requirements=['apache-beam[gcp]', 'pandas'],
python_version=3,
dag=dag
)
lines
Файл конвейерного луча (custom_py_file.py
) выглядит следующим образом:
def main():
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import argparse
import time
class ETL(beam.DoFn):
def process(self, row):
#process data
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://bucket/input/input.txt',
help='Input file to process.'
)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=proj',
'--region=region',
'--staging_location=gs://bucket/staging/',
'--temp_location=gs://bucket/temp/',
'--job_name=name-{}'.format(time.strftime("%Y%m%d%h%M%s").lower()),
'--setup_file=/home/airflow/gcs/dags/setup.py',
'--disk_size_gb=350',
'--machine_type=n1-highmem-96',
'--num_workers=24',
'--autoscaling_algorithm=NONE'
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
rows = (p | 'read rows' >> beam.io.ReadFromText(known_args.input))
etl = (rows | 'process data' >> beam.ParDo(ETL()))
p.run().wait_until_finish()
logging.getLogger().setLevel(logging.DEBUG)
run()
Я использую PythonVirtualenvOperator
, потому что я не могу использовать Python3 и BashOperator
с моей текущей версией воздушного потока (Версия: 1.10.2-composer), и мне нужен Python3 для запуска этого конвейера.
Проблема заключается в том, что, несмотря на успешное выполнение, Airflow отправляет еще одно задание потока данных. Обратите внимание, что это НЕ повторная попытка, поскольку журналы показывают, что это все «одна» задача. Однако журналы потока данных показывают, что он выполняет ту же самую работу снова после того, как он уже успешно выполнен один раз.
![dataflow logs](https://i.stack.imgur.com/yEd4N.png)
Что здесь происходит? Успешное задание потока данных не выводит значение 0? Как заставить его перейти к следующему заданию, если оно выполнено правильно? Спасибо!