DAG воздушного потока продолжает повторяться, не показывая никаких ошибок - PullRequest
0 голосов
/ 06 ноября 2018

Я использую Google Composer. У меня есть черт, который использует функцию panda.read_csv() для чтения файла .csv.gz. DAG продолжает попытки, не показывая никаких ошибок. Вот журнал потока воздуха:

 *** Reading remote log from gs://us-central1-data-airflo-dxxxxx-bucket/logs/youtubetv_gcpbucket_to_bq_daily_v2_csv/file_transfer_gcp_to_bq/2018-11-04T20:00:00/1.log.
[2018-11-05 21:03:58,123] {cli.py:374} INFO - Running on host airflow-worker-77846bb966-vgrbz
[2018-11-05 21:03:58,239] {models.py:1196} INFO - Dependencies all met for <TaskInstance: youtubetv_gcpbucket_to_bq_daily_v2_csv.file_transfer_gcp_to_bq 2018-11-04 20:00:00 [queued]>
[2018-11-05 21:03:58,297] {models.py:1196} INFO - Dependencies all met for <TaskInstance: youtubetv_gcpbucket_to_bq_daily_v2_csv.file_transfer_gcp_to_bq 2018-11-04 20:00:00 [queued]>
[2018-11-05 21:03:58,298] {models.py:1406} INFO -
---------------------------------------------------------------------- 
---------
Starting attempt 1 of 
---------------------------------------------------------------------- 
---------

[2018-11-05 21:03:58,337] {models.py:1427} INFO - Executing <Task(BranchPythonOperator): file_transfer_gcp_to_bq> on 2018-11-04 20:00:00
[2018-11-05 21:03:58,338] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run youtubetv_gcpbucket_to_bq_daily_v2_csv file_transfer_gcp_to_bq 2018-11-04T20:00:00 --job_id 15096 --raw -sd DAGS_FOLDER/dags/testdags/youtubetv_gcp_to_bq_v2.py']

код Python в DAG:

from datetime import datetime,timedelta
from airflow import DAG
from airflow import models
import os
import io,logging, sys
import pandas as pd
from io import BytesIO, StringIO

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.bash_operator import BashOperator

#GCP
from google.cloud import storage
import google.cloud
from google.cloud import bigquery
from google.oauth2 import service_account

from airflow.operators.slack_operator import SlackAPIPostOperator
from airflow.models import Connection
from airflow.utils.db import provide_session
from airflow.utils.trigger_rule import TriggerRule

def readCSV(checked_date,file_name, **kwargs): 
    subDir=checked_date.replace('-','/')
    fileobj = get_byte_fileobj(BQ_PROJECT_NAME, YOUTUBETV_BUCKET, subDir+"/"+file_name)
    df_chunks = pd.read_csv(fileobj, compression='gzip',memory_map=True, chunksize=1000000) # return TextFileReader
    print ("done reaCSV")
    return df_chunks

DAG:

    file_transfer_gcp_to_bq = BranchPythonOperator(
    task_id='file_transfer_gcp_to_bq',
    provide_context=True,
    python_callable=readCSV,
    op_kwargs={'checked_date': '2018-11-03', 'file_name':'daily_events_xxxxx_partner_report.csv.gz'}
    )

DAG успешно запущена на моей локальной версии воздушного потока.

def readCSV(checked_date,file_name, **kwargs): 
   subDir=checked_date.replace('-','/')
   fileobj = get_byte_fileobj(BQ_PROJECT_NAME, YOUTUBETV_BUCKET, subDir+"/"+file_name)
   df = pd.read_csv(fileobj, compression='gzip',memory_map=True)
   return df

протестировал get_byte_fileobj, и он работает как отдельная функция.

Ответы [ 2 ]

0 голосов
/ 05 декабря 2018

Недавно у меня похожая проблема .

В моем случае это из-за перегрузки работника kubernetes.

Вы также можете наблюдать за производительностью работника на панели инструментов kubernetes, чтобы увидеть, является ли ваш случай проблемой перегрузки кластера.

Если да, вы можете попробовать установить значение конфигурации воздушного потока celeryd_concurrency ниже, чтобы уменьшить парализм в работнике и посмотреть, не снижается ли нагрузка на кластер

enter image description here

0 голосов
/ 09 ноября 2018

На основании этого обсуждения airflow google composer group это известная проблема. Одной из причин может быть из-за перерасхода всех ресурсов композитора (в моем случае памяти)

...