Как запустить работу Talend .jar в Airflow? - PullRequest
0 голосов
/ 30 апреля 2019

Привет, люди Земли!Я использую Airflow для планирования и запуска .jar Задание, созданное с помощью Talend Open Studio BigData , я добавил свою работу в качестве группы DAG в Airflow, создав файл .py в AIRFLOW_HOME/dags:

из DAG импорта воздушного потока из airflow.operators.bash_operator import BashOperator из datetime import datetime

import os
import sys


bib_app = "/home/user/Images/JObforAirflow/test/test_run.sh"
default_args = {
    'owner': 'yabid',
    'depends_on_past': False,
    'start_date': datetime(2019, 4, 30),
    'provide_context': True}

args = {
  'owner': 'yabid',
  'start_date': datetime(2019, 4, 25),
  'provide_context': True}

dag = DAG('run_jar', 
default_args=default_args,
description='Dag for batch job')



t1 = BashOperator(
    task_id='dependency',
    bash_command= bib_app,
    dag=dag)


t2 = BashOperator(
 task_id = 't2',
 dag = dag,
 bash_command = 'java -cp /home/user/Images/JObforAirflow/test/jobbatch.jar'
 )

t1.set_upstream(t2)

, но когда я запускаю вручную мой dag, он автоматически выходит из строя

задание работает на Talend,

here is the log file generated by Airflow :
*** Reading local file: /home/user/airflow/logs/run_jar/t2/2019-04-30T16:36:48.390314+00:00/1.log
[2019-04-30 17:37:10,262] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: run_jar.t2 2019-04-30T16:36:48.390314+00:00 [queued]>
[2019-04-30 17:37:10,266] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: run_jar.t2 2019-04-30T16:36:48.390314+00:00 [queued]>
[2019-04-30 17:37:10,266] {__init__.py:1353} INFO - 
--------------------------------------------------------------------------------
[2019-04-30 17:37:10,266] {__init__.py:1354} INFO - Starting attempt 1 of 1
[2019-04-30 17:37:10,266] {__init__.py:1355} INFO - 
--------------------------------------------------------------------------------
[2019-04-30 17:37:10,323] {__init__.py:1374} INFO - Executing <Task(BashOperator): t2> on 2019-04-30T16:36:48.390314+00:00
[2019-04-30 17:37:10,323] {base_task_runner.py:119} INFO - Running: [u'airflow', u'run', 'run_jar', 't2', '2019-04-30T16:36:48.390314+00:00', u'--job_id', '22', u'--raw', u'-sd', u'DAGS_FOLDER/run_jar.py', u'--cfg_path', '/tmp/tmpd_yKoR']
[2019-04-30 17:37:10,761] {base_task_runner.py:101} INFO - Job 22: Subtask t2 [2019-04-30 17:37:10,760] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-04-30 17:37:10,933] {base_task_runner.py:101} INFO - Job 22: Subtask t2 [2019-04-30 17:37:10,932] {__init__.py:305} INFO - Filling up the DagBag from /home/user/airflow/dags/run_jar.py
[2019-04-30 17:37:10,945] {base_task_runner.py:101} INFO - Job 22: Subtask t2 [2019-04-30 17:37:10,945] {cli.py:517} INFO - Running <TaskInstance: run_jar.t2 2019-04-30T16:36:48.390314+00:00 [running]> on host user-OptiPlex-3020
[2019-04-30 17:37:10,954] {bash_operator.py:81} INFO - Tmp dir root location: 
 /tmp
[2019-04-30 17:37:10,954] {bash_operator.py:90} INFO - Exporting the following env vars:
AIRFLOW_CTX_TASK_ID=t2
AIRFLOW_CTX_DAG_ID=run_jar
AIRFLOW_CTX_EXECUTION_DATE=2019-04-30T16:36:48.390314+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2019-04-30T16:36:48.390314+00:00
[2019-04-30 17:37:10,955] {bash_operator.py:104} INFO - Temporary script location: /tmp/airflowtmp8m82nT/t2_l6p9W
[2019-04-30 17:37:10,955] {bash_operator.py:114} INFO - Running command: java -cp /home/user/Images/JObforAirflow/test/jobbatch.jar
[2019-04-30 17:37:10,959] {bash_operator.py:123} INFO - Output:
[2019-04-30 17:37:11,020] {bash_operator.py:127} INFO - Syntaxe : java [-options] class [args...]
[2019-04-30 17:37:11,020] {bash_operator.py:127} INFO -            (pour l'exécution d'une classe)
[2019-04-30 17:37:11,020] {bash_operator.py:127} INFO -    ou  java [-options] -jar jarfile [args...]
[2019-04-30 17:37:11,020] {bash_operator.py:127} INFO -            (pour l'exécution d'un fichier JAR)
[2019-04-30 17:37:11,020] {bash_operator.py:127} INFO - où les options comprennent :
[2019-04-30 17:37:11,020] {bash_operator.py:127} INFO -     -d32      utilisez le modèle de données 32 bits s'il est disponible
[2019-04-30 17:37:11,020] {bash_operator.py:127} INFO -     -d64      utilisez le modèle de données 64 bits s'il est disponible
[2019-04-30 17:37:11,021] {bash_operator.py:127} INFO -     -server   pour sélectionner la machine virtuelle "server"
[2019-04-30 17:37:11,021] {bash_operator.py:127} INFO -     -zero     pour sélectionner la machine virtuelle "zero"
[2019-04-30 17:37:11,021] {bash_operator.py:127} INFO -     -dcevm    pour sélectionner la machine virtuelle "dcevm"
[2019-04-30 17:37:11,021] {bash_operator.py:127} INFO -                   La machine virtuelle par défaut est server,
[2019-04-30 17:37:11,021] {bash_operator.py:127} INFO -                   car vous exécutez une machine de classe de serveur.
[2019-04-30 17:37:11,021] {bash_operator.py:127} INFO - 
[2019-04-30 17:37:11,021] {bash_operator.py:127} INFO - 
[2019-04-30 17:37:11,021] {bash_operator.py:127} INFO -     -cp <class search path of directories and zip/jar files>
[2019-04-30 17:37:11,021] {bash_operator.py:127} INFO -     -classpath <class search path of directories and zip/jar files>
[2019-04-30 17:37:11,021] {bash_operator.py:127} INFO -                   Liste de répertoires, d'archives JAR et
[2019-04-30 17:37:11,021] {bash_operator.py:127} INFO -                    d'archives ZIP séparés par des :, dans laquelle rechercher les fichiers de classe.
[2019-04-30 17:37:11,021] {bash_operator.py:127} INFO -     -D<name>=<value>
[2019-04-30 17:37:11,022] {bash_operator.py:127} INFO -                   définition d'une propriété système
[2019-04-30 17:37:11,022] {bash_operator.py:127} INFO -     -verbose:[class|gc|jni]
[2019-04-30 17:37:11,022] {bash_operator.py:127} INFO -                   activation de la sortie en mode verbose
[2019-04-30 17:37:11,022] {bash_operator.py:127} INFO -     -version      impression de la version du produit et fin de l'opération
[2019-04-30 17:37:11,022] {bash_operator.py:127} INFO -     -version:<value>
[2019-04-30 17:37:11,022] {bash_operator.py:127} INFO -                   Avertissement : cette fonctionnalité est en phase d'abandon et sera enlevée
[2019-04-30 17:37:11,022] {bash_operator.py:127} INFO -                   dans une version future.
[2019-04-30 17:37:11,022] {bash_operator.py:127} INFO -                   exécution de la version spécifiée obligatoire
[2019-04-30 17:37:11,022] {bash_operator.py:127} INFO -     -showversion  impression de la version du produit et poursuite de l'opération
[2019-04-30 17:37:11,022] {bash_operator.py:127} INFO -     -jre-restrict-search | -no-jre-restrict-search
[2019-04-30 17:37:11,022] {bash_operator.py:127} INFO -                   Avertissement : cette fonctionnalité est en phase d'abandon et sera enlevée
[2019-04-30 17:37:11,022] {bash_operator.py:127} INFO -                   dans une version future.
[2019-04-30 17:37:11,023] {bash_operator.py:127} INFO -                   inclusion/exclusion des environnements JRE privés de l'utilisateur dans la recherche de version
[2019-04-30 17:37:11,023] {bash_operator.py:127} INFO -     -? -help      impression du message d'aide
[2019-04-30 17:37:11,023] {bash_operator.py:127} INFO -     -X            impression de l'aide sur les options non standard
[2019-04-30 17:37:11,023] {bash_operator.py:127} INFO -     -ea[:<packagename>...|:<classname>]
[2019-04-30 17:37:11,023] {bash_operator.py:127} INFO -     -enableassertions[:<packagename>...|:<classname>]
[2019-04-30 17:37:11,023] {bash_operator.py:127} INFO -                   activation des assertions avec la granularité spécifiée
[2019-04-30 17:37:11,023] {bash_operator.py:127} INFO -     -da[:<packagename>...|:<classname>]
[2019-04-30 17:37:11,023] {bash_operator.py:127} INFO -     -disableassertions[:<packagename>...|:<classname>]
[2019-04-30 17:37:11,023] {bash_operator.py:127} INFO -                   désactivation des assertions avec la granularité spécifiée
[2019-04-30 17:37:11,023] {bash_operator.py:127} INFO -     -esa | -enablesystemassertions
[2019-04-30 17:37:11,023] {bash_operator.py:127} INFO -                   activation des assertions système
[2019-04-30 17:37:11,023] {bash_operator.py:127} INFO -     -dsa | -disablesystemassertions
[2019-04-30 17:37:11,024] {bash_operator.py:127} INFO -                   désactivation des assertions système
[2019-04-30 17:37:11,024] {bash_operator.py:127} INFO -     -agentlib:<libname>[=<options>]
[2019-04-30 17:37:11,024] {bash_operator.py:127} INFO -                   chargement de la bibliothèque d'agent natif <libname>, par exemple -agentlib:hprof
[2019-04-30 17:37:11,024] {bash_operator.py:127} INFO -                   voir également, -agentlib:jdwp=help et -agentlib:hprof=help
[2019-04-30 17:37:11,024] {bash_operator.py:127} INFO -     -agentpath:<pathname>[=<options>]
[2019-04-30 17:37:11,024] {bash_operator.py:127} INFO -                   chargement de la bibliothèque d'agent natif via le chemin d'accès complet
[2019-04-30 17:37:11,024] {bash_operator.py:127} INFO -     -javaagent:<jarpath>[=<options>]
[2019-04-30 17:37:11,024] {bash_operator.py:127} INFO -                   chargement de l'agent du langage de programmation Java, voir java.lang.instrument
[2019-04-30 17:37:11,024] {bash_operator.py:127} INFO -     -splash:<imagepath>
[2019-04-30 17:37:11,024] {bash_operator.py:127} INFO -                   affichage de l'écran d'accueil avec l'image spécifiée
[2019-04-30 17:37:11,024] {bash_operator.py:127} INFO - Voir http://www.oracle.com/technetwork/java/javase/documentation/index.html pour plus de détails.
[2019-04-30 17:37:11,027] {bash_operator.py:131} INFO - Command exited with return code 1
[2019-04-30 17:37:11,031] {__init__.py:1580} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/user/.local/lib/python2.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/user/.local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 135, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2019-04-30 17:37:11,032] {__init__.py:1611} INFO - Marking task as FAILED.
[2019-04-30 17:37:11,094] {base_task_runner.py:101} INFO - Job 22: Subtask t2 Traceback (most recent call last):
[2019-04-30 17:37:11,094] {base_task_runner.py:101} INFO - Job 22: Subtask t2   File "/home/user/.local/bin/airflow", line 32, in <module>
[2019-04-30 17:37:11,094] {base_task_runner.py:101} INFO - Job 22: Subtask t2     args.func(args)
[2019-04-30 17:37:11,094] {base_task_runner.py:101} INFO - Job 22: Subtask t2   File "/home/user/.local/lib/python2.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-04-30 17:37:11,094] {base_task_runner.py:101} INFO - Job 22: Subtask t2     return f(*args, **kwargs)
[2019-04-30 17:37:11,094] {base_task_runner.py:101} INFO - Job 22: Subtask t2   File "/home/user/.local/lib/python2.7/site-packages/airflow/bin/cli.py", line 523, in run
[2019-04-30 17:37:11,094] {base_task_runner.py:101} INFO - Job 22: Subtask t2     _run(args, dag, ti)
[2019-04-30 17:37:11,094] {base_task_runner.py:101} INFO - Job 22: Subtask t2   File "/home/user/.local/lib/python2.7/site-packages/airflow/bin/cli.py", line 442, in _run
[2019-04-30 17:37:11,094] {base_task_runner.py:101} INFO - Job 22: Subtask t2     pool=args.pool,
[2019-04-30 17:37:11,095] {base_task_runner.py:101} INFO - Job 22: Subtask t2   File "/home/user/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 73, in wrapper
[2019-04-30 17:37:11,095] {base_task_runner.py:101} INFO - Job 22: Subtask t2     return func(*args, **kwargs)
[2019-04-30 17:37:11,095] {base_task_runner.py:101} INFO - Job 22: Subtask t2   File "/home/user/.local/lib/python2.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
[2019-04-30 17:37:11,095] {base_task_runner.py:101} INFO - Job 22: Subtask t2     result = task_copy.execute(context=context)
[2019-04-30 17:37:11,095] {base_task_runner.py:101} INFO - Job 22: Subtask t2   File "/home/user/.local/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 135, in execute
[2019-04-30 17:37:11,095] {base_task_runner.py:101} INFO - Job 22: Subtask t2     raise AirflowException("Bash command failed")
[2019-04-30 17:37:11,095] {base_task_runner.py:101} INFO - Job 22: Subtask t2 airflow.exceptions.AirflowException: Bash command failed
[2019-04-30 17:37:15,446] {logging_mixin.py:95} INFO - [2019-04-30 17:37:15,445] {jobs.py:2562} INFO - Task exited with return code 1

Я пытался соединить все вместе, я не знаю, где главная ошибка, задание отлично работает на Talend, поэтому ошибка не в заданииобсуждаемый.Может быть, кто-то использовал это раньше и имеет рабочий пример.Спасибо за ваше время!

Ответы [ 2 ]

0 голосов
/ 11 мая 2019

Мы запускаем задания Talend, развернутые на проводнике заданий TAC из Airflow, вызывая скрипт MetaServlet. Таким образом, вы не только запускаете задания Talend из Airflow, но также можете использовать функции TAC для просмотра журналов, истории запусков, статистики в реальном времени и т. Д.,

tac_job_run_command = "/opt/tomcat/webapps/tac/WEB-INF/classes/MetaServletCaller.sh --tac-url=http://xx.xxx.xx.xxx:8080/tac/ --json-params='{"actionName":"runTask","authPass":"Password123","authUser":"talend@talend.com","taskId":123,"mode":"synchronous"}' "

talend_job_task = SSHOperator(
    task_id='talend_job_task',
    ssh_conn_id='talend_tac_server',
    command=tac_job_run_command,
    do_xcom_push=True,
    dag=dag)

Мы используем SSHOperator для подключения к TAC-серверу и запускаем сценарий метасервлета с необходимыми учетными данными и параметрами. Я бы предложил почитать блог Эдварда Оста по использованию метасервлета (http://edwardost.github.io/talend/di/2015/05/28/Using-the-TAC-API/)

Примечание : я создал соединение Airflow с именем "talend_tac_server" с учетными данными для подключения к моему серверу TAC.

0 голосов
/ 02 мая 2019

Когда вы создаете задание в Talend Studio, оно генерирует файлы .sh (для * nix) и .bat (для Windows), а также файл .jar. Вы захотите вызвать / выполнить .sh, а не .jar файл напрямую, поскольку он будет содержать настройки Java, которые вы установили в Talend.

Так что ваш bash_command должен быть примерно таким bash_command = 'bash /home/user/Images/JObforAirflow/test/jobbatch.sh'.

Имейте в виду, что вам может потребоваться разрешить выполнение .sh, т.е. chmod +x file.sh. Также, возможно, стоит выполнить файл .sh из терминала (не в Airflow), чтобы убедиться, что он успешно работает вне Talend Studio.

...