Apache Airflow SparkSQLOperator продолжает печатать пустые логи - PullRequest
0 голосов
/ 16 мая 2018

Я написал очень простой знак потока воздуха следующим образом:

import airflow                                                                                                                                                
from airflow import DAG                                                                                                                                        from airflow.contrib.operators.spark_sql_operator import SparkSqlOperator                                                                                     
from datetime import timedelta                                                                                                                                 from datetime import datetime as dt                                                                                                                           

default_args = {                                                                                                                                              
             'owner': 'zxy',                                                                                                                                       'depends_on_past': False,                                                                                                                             
             'email_on_failure': True,                                                                                                                                      'email_on_retry': True,                                                                                                                               
             'retries': 1,                                                                                                                                         
             'retry_delay': timedelta(minutes=5)                                                                                                                   
             }                                                                                                                                                     

dag = DAG(                                                                                                                                                    
             'my_first_dag',                                                                                                                                      
             default_args=default_args,                                                                                                                            
             #start_date=dt.strptime('2018-05-16', '%Y-%m-%d'),                                                                                                    
             start_date=airflow.utils.dates.days_ago(2),                                                                                                           
             description='My First Airflow DAG',                                                                                                               
             schedule_interval=timedelta(minutes=5))                                                                                                               

sql = r'''select count(u) from some_table where time=20180513 and platform='iOS' '''                                                    
t1 = SparkSqlOperator(task_id='Count_Ads_U', conn_id='spark_default',sql=sql, dag=dag)  

Затем я запустил airflow scheduler, чтобы запланировать задание.

Задание дало правильное число успешно, но заданиепродолжал печатать пустые журналы следующим образом, поэтому не может успешно остановиться:

[2018-05-16 06: 33: 07,505] {base_task_runner.py:98} INFO - подзадача: [2018-05-16 06: 33: 07,505] {spark_sql_hook.py:142} INFO - b'18 / 05/16 06:33:07 INFO spark.SparkContext: успешно остановлен SparkContext \ n '
[2018-05-16 06: 33: 07,506] {base_task_runner.py:98} INFO - подзадача: [2018-05-16 06: 33: 07,506] {spark_sql_hook.py: 142} INFO - b'18 / 05/16 06:33:07 INFO util.ShutdownHookManager: Хук выключения вызван \ n '
[2018-05-16 06: 33: 07,506] {base_task_runner.py:98} INFO - подзадача: [2018-05-16 06: 33: 07,506] {spark_sql_hook.py: 142} INFO - b'18 / 05/16 06:33:07 INFO util.ShutdownHookManager: Удаление каталога / tmp / spark-fbb4089c-338b-4b0e-a394-975f45b307a8 \ n '
[2018-05-16 06: 33: 07,509] {base_task_runner.py:98} ИНФОРМАЦИЯ - Подзадача: [2018-05-16 06: 33: 07,509] {spark_sql_hook.py:142} INFO - b'18 / 05/16 06:33:07 INFO util.ShutdownHookManager: Удаление каталога / apps / data / spark / temp / spark-f6b6695f-24e4-4db0-ae2b-29b6836ab9c3 \ n '
[2018-05-16 06: 33: 07,902] {base_task_runner.py:98} INFO- Подзадача: [2018-05-16 06: 33: 07,902] {spark_sql_hook.py:142} INFO - b ''
[2018-05-16 06: 33: 07,903] {base_task_runner.py:98} INFO- Подзадача: [2018-05-16 06: 33: 07,902] {spark_sql_hook.py:142} INFO - b ''
[2018-05-16 06: 33: 07,903] {base_task_runner.py:98} INFO- Подзадача: [2018-05-16 06: 33: 07,902] {spark_sql_hook.py:142} INFO - b ''
[2018-05-16 06: 33: 07,903] {base_task_runner.py:98} INFO- Подзадача: [2018-05-16 06: 33: 07,902] {spark_sql_hook.py:142} INFO - b ''
[2018-05-16 06: 33: 07,903] {base_task_runner.py:98} INFO- Подзадача: [2018-05-16 06: 33: 07,903] {spark_sql_hook.py:142} INFO - b ''
[2018-05-16 06: 33: 07,903] {base_task_runner.py:98} INFO- Подзадача: [2018-05-16 06: 33: 07,903] {spark_sql_hook.py:142} INFO - b ''
[2018-05-16 06: 33: 07,903] {base_task_runner.py:98} INFO- Подзадача: [2018-05-16 06:33: 07,903] {spark_sql_hook.py:142} INFO - b ''
[2018-05-16 06: 33: 07,903] {base_task_runner.py:98} INFO - подзадача: [2018-05-16 06:33: 07,903] {spark_sql_hook.py:142} INFO - b ''
[2018-05-16 06: 33: 07,903] {base_task_runner.py:98} INFO - подзадача: [2018-05-16 06:33: 07,903] {spark_sql_hook.py:142} INFO - b ''
[2018-05-16 06: 33: 07,903] {base_task_runner.py:98} INFO - подзадача: [2018-05-16 06:33: 07,903] {spark_sql_hook.py:142} INFO - b ''
[2018-05-16 06: 33: 07,903] {base_task_runner.py:98} INFO - подзадача: [2018-05-16 06:33: 07,903] {spark_sql_hook.py:142} INFO - b ''
[2018-05-16 06: 33: 07,903] {base_task_runner.py:98} INFO - подзадача: [2018-05-16 06:33: 07,903] {spark_sql_hook.py:142} INFO - b ''
[2018-05-16 06: 33: 07,903] {base_task_runner.py:98} INFO - подзадача: [2018-05-16 06:33: 07,903] {spark_sql_hook.py:142} INFO - b ''
[2018-05-16 06: 33: 07,904] {base_task_runner.py:98} INFO - подзадача: [2018-05-16 06:33: 07,903] {spark_sql_hook.py:142} ИНФОРМАЦИЯ - b ''
[2018-05-16 06: 33: 07,904] {base_task_runner.py:98} INFO - подзадача: [2018-05-16 06: 33: 07,903] {spark_sql_hook.py:142} INFO - b ''
[2018-05-16 06: 33: 07,904] {base_task_runner.py:98} INFO - подзадача: [2018-05-16 06: 33: 07,903] {spark_sql_hook.py:142} INFO - b ''

Пустой журнал продолжался бесконечно, пока я не остановил планировщик с помощью Ctr + C.

версия воздушного потока v1.9.0.

1 Ответ

0 голосов
/ 16 мая 2018

Проблема решена.

Это вызвано проблемой byte literal vs string literal, если вы используете Python 3.x (строка 146 из https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py):

for line in iter(self._sp.stdout.readline, ''):
    self.log.info(line)

Часовой, используемый в iter, это '', то есть пустой string literal. Но фактический контент в stdout - byte literals вместо string literals (см. Этот пост для справки: Что делает символ 'b' перед строковым литералом? ), как можно судить по префикс b в каждой строке журнала, поэтому цикл for по какой-то причине никогда не заканчивается.

Я исправил проблему, заменив '' на b''.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...