Пользовательский датчик воздушного потока: AttributeError: у объекта 'NoneType' нет атрибута 'get_records' - PullRequest
0 голосов
/ 09 апреля 2019

Я использую Airflow v1.9.0 с Celery Executor. Я настроил разных работников с разными именами очередей, таких как DEV, QA, UAT, PROD. Я написал собственный датчик, который опрашивает исходное соединение БД и целевое соединение БД, выполняет различные запросы и делает некоторые проверки перед запуском downstream-задач. Это работает нормально для нескольких работников. У одного из рабочих этот датчик выдает ошибку AttributeError:

$ airflow test PDI_Incr_20190407_v1  checkCCWatermarkDt 2019-04-09
[2019-04-09 10:02:57,769] {configuration.py:206} WARNING - section/key [celery/celery_ssl_active] not found in config
[2019-04-09 10:02:57,770] {default_celery.py:41} WARNING - Celery Executor will run without SSL
[2019-04-09 10:02:57,771] {__init__.py:45} INFO - Using executor CeleryExecutor
[2019-04-09 10:02:57,817] {models.py:189} INFO - Filling up the DagBag from /home/airflow/airflow/dags
/usr/local/lib/python2.7/site-packages/airflow/models.py:2160: PendingDeprecationWarning: Invalid arguments were passed to ExternalTaskSensor. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'check_existence': True}
  category=PendingDeprecationWarning
[2019-04-09 10:02:57,989] {base_hook.py:80} INFO - Using connection to: 172.16.20.11:1521/GWPROD
[2019-04-09 10:02:57,991] {base_hook.py:80} INFO - Using connection to: dmuat.cwmcwghvymd3.us-east-1.rds.amazonaws.com:1521/DMUAT
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 528, in test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1584, in run
    session=session)
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1493, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/operators/sensors.py", line 78, in execute
    while not self.poke(context):
  File "/home/airflow/airflow/plugins/PDIPlugin.py", line 29, in poke
    wm_dt_src = hook_src.get_records(self.sql)
AttributeError: 'NoneType' object has no attribute 'get_records'

Хотя когда я запускаю ту же тестовую команду из CLI Scheduler, она работает нормально. Вышеуказанная проблема выглядит как проблема подключения к базе данных.

Для отладки я проверил соединения с БД из интерфейса Airflow: Профилирование данных -> Специальный запрос Запрос: выберите 1 из двойного; - Это работало нормально

Я также сделал telnet с рабочего узла на хост и порт БД, и это также прошло нормально.

Пользовательский код датчика:

from airflow.plugins_manager import AirflowPlugin
from airflow.hooks.base_hook import BaseHook
from airflow.operators.sensors import SqlSensor

class SensorWatermarkDt(SqlSensor):

    def __init__(self, conn_id, sql, conn_id_tgt, sql_tgt, *args, **kwargs):
        self.sql = sql
        self.conn_id = conn_id
        self.sql_tgt = sql_tgt
        self.conn_id_tgt = conn_id_tgt
        super(SqlSensor, self).__init__(*args, **kwargs)

    def poke(self, context):
        hook_src = BaseHook.get_connection(self.conn_id).get_hook()
        hook_tgt = BaseHook.get_connection(self.conn_id_tgt).get_hook()

        self.log.info('Poking: %s', self.sql)
        self.log.info('Poking: %s', self.sql_tgt)
        wm_dt_src = hook_src.get_records(self.sql)
        wm_dt_tgt = hook_tgt.get_records(self.sql_tgt)

        if wm_dt_src <= wm_dt_tgt:
            return False
        else:
            return True

class PDIPlugin(AirflowPlugin):
    name = "PDIPlugin"
    operators = [SensorWatermarkDt]

Фрагмент DAG воздушного потока:

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.email_operator import EmailOperator
from datetime import timedelta,datetime
from airflow.operators import SensorWatermarkDt
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators.dummy_operator import DummyOperator

default_args = {
    'owner': 'SenseTeam',
    #'depends_on_past': True,
    'depends_on_past' : False,
    'start_date': datetime(2019, 4, 7, 17, 00),
    'email': [],
    'email_on_failure': False,
    'email_on_retry': False,
    'queue': 'PENTAHO_UAT'
}

dag = DAG(dag_id='PDI_Incr_20190407_v1',
          default_args=default_args,
          max_active_runs=1,
          concurrency=1,
          catchup=False,
          schedule_interval=timedelta(hours=24),
          dagrun_timeout=timedelta(minutes=23*60))

checkCCWatermarkDt = \
        SensorWatermarkDt(task_id='checkCCWatermarkDt',
                          conn_id='CCUSER_SOURCE_GWPROD_RPT',
                          sql="SELECT MAX(CC_WM.CREATETIME) as CURRENT_WATERMARK_DATE FROM CCUSER.CCX_CAPTUREREASON_ETL CC_WM INNER JOIN CCUSER.CCTL_CAPTUREREASON_ETL CC_WMLKP ON  CC_WM.CAPTUREREASON_ETL = CC_WMLKP.ID AND UPPER(CC_WMLKP.DESCRIPTION)= 'WATERMARK'",
                          conn_id_tgt = 'RDS_DMUAT_DMCONFIG',
                          sql_tgt = "SELECT MAX(CURRENT_WATERMARK_DATE) FROM DMCONFIG.PRESTG_DM_WMD_WATERMARKDATE WHERE SCHEMA_NAME = 'CCUSER'",
                          poke_interval=60,
                          dag=dag)
...

Я перезапустил веб-сервер, планировщик и работника воздушного потока после добавления этого плагина в этот рабочий узел.

Что мне здесь не хватает?

...