Использование Sqoop для перемещения таблиц из MySQL в Hive по расписанию с использованием удаленно установленного Airflow - PullRequest
0 голосов
/ 01 июля 2018

У меня есть сценарий, в котором у меня есть настройка AWS EMR с несколькими приложениями, такими как Spark, Hadoop, Hive, HCatalog, Zeppelin, Sqoop и т. Д. И у меня есть другой сервер, на котором работает только Airflow.

Я работаю над требованием, в котором я хочу переместить таблицы MySQL (который снова находится в другом экземпляре RDS) в Hive, используя Sqoop, и этот триггер должен быть отправлен Airflow.

Возможно ли добиться этого с помощью SqoopOperator, доступного в Airflow, если Airflow находится на удаленном сервере? Я не верю, тогда есть ли другой способ достичь этого?

Заранее спасибо.

Ответы [ 2 ]

0 голосов
/ 20 июля 2018

Попробуйте добавить шаг, используя script-runner.jar. здесь больше .

aws emr create-cluster --name "Test cluster" –-release-label emr-5.16.0 --applications Name=Hive Name=Pig --use-default-roles --ec2-attributes KeyName=myKey --instance-type m4.large --instance-count 3 --steps Type=CUSTOM_JAR,Name=CustomJAR,ActionOnFailure=CONTINUE,Jar=s3://region.elasticmapreduce/libs/script-runner/script-runner.jar,Args=["s3://mybucket/script-path/my_script.sh"]

Тогда вы можете сделать это как это .

SPARK_TEST_STEPS = [
    {
        'Name': 'demo',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'cn-northwest-1.elasticmapreduce/libs/script-runner/script-runner.jar',
            'Args': [
                "s3://d.s3.d.com/demo.sh",
            ]
        }
    }
]

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=SPARK_TEST_STEPS,
    dag=dag
)

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

step_adder.set_downstream(step_checker)

demo.sh вот так.

!/bin/bash    
sqoop import --connect jdbc:mysql://mysql.example.com/testDb --username root --password hadoop123 --table student
0 голосов
/ 04 июля 2018

Да, это возможно. Я признаю, что документация о том, как использовать операторы, отсутствует, но если вы понимаете концепцию hooks и операторов в Airflow, вы можете понять это, прочитав код оператор, который вы хотите использовать. В этом случае вам нужно прочитать кодовую базу SqoopHook и SqoopOperator . Большая часть того, что я знаю, как сделать с Airflow, получается из чтения кода, хотя я не использовал этот оператор, я могу попытаться помочь вам здесь как можно лучше.

Предположим, вы хотите выполнить эту команду sqoop:

sqoop import --connect jdbc:mysql://mysql.example.com/testDb --username root --password hadoop123 --table student 

И у вас есть сервер Sqoop, работающий на удаленном хосте, к которому вы можете получить доступ с помощью Scoop-клиента в http://scoop.example.com:12000/sqoop/.

Во-первых, вам нужно создать соединение в пользовательском интерфейсе администратора воздушного потока, вызвать соединение sqoop. Для соединения заполните host как scoop.example.com, schema как sqoop и port как 12000. Если у вас есть пароль, вам нужно будет поместить его в файл на вашем сервере и в extras заполнить строку json, которая выглядит как {'password_file':'/path/to/password.txt'} ( см. Встроенный код об этом файле паролей).

После того, как вы настроили соединение в пользовательском интерфейсе, теперь можете создать задачу, используя SqoopOperator в вашем DAG-файле. Это может выглядеть так:

sqoop_mysql_export = SqoopOperator(conn_id='sqoop',
                                   table='student',
                                   username='root',
                                   password='password',
                                   driver='jdbc:mysql://mysql.example.com/testDb',
                                   cmd_type='import')

Полный список параметров, которые вы можете передать для импорта, вы можете найти в коде здесь .

Вы можете видеть, как SqoopOperator (и действительно SqoopHook, который оператор использует для подключения к Sqoop) переводит эти аргументы в команды командной строки здесь .

Действительно, этот SqoopOperator просто работает, переводя kwargs, которые вы передаете, в команды CLI клиента sqoop. Если вы посмотрите SqoopHook, вы сможете увидеть, как это делается, и, возможно, выяснить, как заставить его работать в вашем случае. Удачи!

Для устранения неполадок я бы порекомендовал SSHing на сервер, на котором запущен Airflow, и подтвердил, что вы можете запустить клиент Scoop из командной строки и подключиться к удаленному серверу Scoop.

...