Запустите блокнот Databricks с кодом PySpark, используя Apache Airflow - PullRequest
1 голос
/ 25 июня 2019

Я использую Airflow, Databricks и PySpark. Я хотел бы знать, возможно ли добавить дополнительные параметры, когда я хочу запустить Блокнот данных через Airflow.

У меня был следующий код в Python с именем MyETL:

def main(**kwargs):
      spark.sql("CREATE TABLE {0} {1}".format(table, columns))
      print("Running my ETL!")

    if __name__== "__main__":
      main(arg1, arg2)

Я хочу определить другие параметры задачи, которые запускают записную книжку Databricks с большим количеством параметров, я хочу добавить название метода и параметры этих методов. Например, когда я хочу зарегистрировать задачи в DAG в Airflow:

   notebook_task_params = {
        'new_cluster': new_cluster,
        'notebook_task': {
            'notebook_path': '/Users/airflow@example.com/MyETL',
            'method_name': 'main',
            'params':'[{'table':'A'},{'columns':['a', 'b']}]'
        },
    }

Я не знаю, возможно ли это, потому что я не нашел подобных примеров.

# Example of using the JSON parameter to initialize the operator.
notebook_task = DatabricksSubmitRunOperator(
    task_id='notebook_task',
    dag=dag,
    json=notebook_task_params)

Другими словами, я хочу выполнить записную книжку с параметрами, используя Airflow. Мой вопрос, как я могу это сделать?

1 Ответ

2 голосов
/ 25 июня 2019

Вы также можете добавить method_name как params, а затем проанализировать свою логику на ноутбуке.

Тем не менее, более распространенный способ - убедиться, что метод уже установлен в вашем кластере.

params = '[{'table':'A'},{'columns':['a', 'b']}]'

Затем в вашей записной книжке на кирпичи данных:

table = getArgument("table", "DefaultValue")
columns = getArgument("columns", "DefaultValue")

result = method(table, columns)

Вы также узнаете, доступны ли параметры с помощью getArgument(), если вывы можете увидеть ваши параметры (изображение прилагается выше) в вашем рабочем задании.

enter image description here

...