Сбой воздушного потока: строка ParseException 2: 0 не может распознать ввод рядом - PullRequest
0 голосов
/ 12 ноября 2018

Я пытаюсь запустить тестовое задание на Airflow, но получаю следующую ошибку:

FAILED: ParseException 2: 0 не может распознать ввод рядом с 'create_import_table_fct_latest_values' '.' 'HQL'

Вот мой файл Airflow Dag:

import airflow
from datetime import datetime, timedelta
from airflow.operators.hive_operator import HiveOperator
from airflow.models import DAG

args = {
    'owner': 'raul',
    'start_date': datetime(2018, 11, 12),
    'provide_context': True,
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email': ['raul.gregglino@leroymerlin.ru'],
    'email_on_failure': True,
    'email_on_retry': False
}

dag = DAG('opus_data', 
    default_args=args,
    max_active_runs=6,
    schedule_interval="@daily"
)

import_lv_data = HiveOperator(
    task_id='fct_latest_values',
    hive_cli_conn_id='metastore_default',
    hql='create_import_table_fct_latest_values.hql ',
    hiveconf_jinja_translate=True,
    dag=dag
    )

deps = {}

# Explicity define the dependencies in the DAG
for downstream, upstream_list in deps.iteritems():
    for upstream in upstream_list:
        dag.set_dependency(upstream, downstream)

Вот содержимое моего HQL-файла, на случай, если это может быть проблемой, и я не могу понять:

*I'm testing the connection to understand if the table is created or not, then I'll try to LOAD DATA, hence the LOAD DATA is commented out.
CREATE TABLE IF NOT EXISTS opus_data.fct_latest_values_new_data (
    id_product          STRING,
    id_model            STRING,
    id_attribute        STRING,
    attribute_value     STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED ',';

#LOAD DATA LOCAL INPATH
#'/media/windows_share/schemas/opus/fct_latest_values_20181106.csv'
#OVERWRITE INTO TABLE opus_data.fct_latest_values_new_data;

Ответы [ 2 ]

0 голосов
/ 13 ноября 2018

Мне удалось найти ответ на мою проблему.

Это было связано с путем, по которому мой HiveOperator вызывал файл.Поскольку не было определено никакой переменной, указывающей Airflow, где искать, я получил ошибку, о которой упоминал в своем посте.

Как только я определил ее с помощью интерфейса веб-сервера (см. Рисунок), мой dag начал работатьpropertly.enter image description here

Я внес изменение в свой код DAG, касающийся расположения файлов только для организации, и вот так теперь выглядит мой HiveOperator:

import_lv_data = HiveOperator(
    task_id='fct_latest_values',
    hive_cli_conn_id='metastore_default',
    hql='hql/create_import_table_fct_latest_values2.hql',
    hiveconf_jinja_translate=True,
    dag=dag
    )

Спасибо(@panov.st), который помог мне лично определить мою проблему.

0 голосов
/ 12 ноября 2018

В файле HQL это должно быть FIELDS TERMINATED BY ',':

CREATE TABLE IF NOT EXISTS opus_data.fct_latest_values_new_data (
    id_product          STRING,
    id_model            STRING,
    id_attribute        STRING,
    attribute_value     STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

И комментарии должны начинаться с -- в файле HQL, а не #

Также это кажется неправильным и вызывает исключение hql='create_import_table_fct_latest_values.hql '

Посмотрите на этот пример:

 #Create full path for the file
    hql_file_path = os.path.join(os.path.dirname(__file__), source['hql'])
    print hql_file_path
    run_hive_query = HiveOperator(
        task_id='run_hive_query',
        dag = dag,
        hql = """
        {{ local_hive_settings }}
        """ + "\n " + open(hql_file_path, 'r').read()
)

Подробнее см. здесь .

Или поместить все HQL в параметр hql:

hql='CREATE TABLE IF NOT EXISTS opus_data.fct_latest_values_new_data ...'
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...