Как сохранить sql вывод в pandas фрейм данных с помощью Airflow? - PullRequest
6 голосов
/ 02 мая 2020

Я хочу сохранить данные из SQL в Pandas фрейм данных и выполнить некоторые преобразования данных, а затем загрузить их в другую таблицу с помощью воздушного потока

Проблема, с которой я сталкиваюсь, заключается в том, что строка подключения к таблицам имеет только accessbale через воздушный поток. Поэтому мне нужно использовать поток воздуха в качестве носителя для чтения и записи данных.

Как это можно сделать?

МОЙ код

Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="SELECT * FROM Western.trip limit 5 ",
    params={'limit': '50'},
    dag=dag

выходные данные задачи должны быть сохранены в фрейме данных (df) и после преобразований и загружены обратно в другую таблицу.

Как это можно сделать?

1 Ответ

0 голосов
/ 02 мая 2020

Я сомневаюсь, что для этого есть встроенный оператор. Вы можете легко написать пользовательский оператор

  • Расширить PostgresOperator или просто BaseOperator / любой другой оператор по вашему выбору. Весь пользовательский код переходит в переопределенный execute() метод
  • Затем используйте PostgresHook, чтобы получить Pandas DataFrame вызывая get_pandas_df() функцию
  • Выполните любые преобразования, которые вам нужно сделать, в вашей pandas df
  • Наконец используйте insert_rows() функцию для вставки данных в таблицу

UPDATE-1

По запросу я добавляю код для оператора

from typing import Dict, Any, List, Tuple

from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.decorators import apply_defaults
from pandas import DataFrame


class MyCustomOperator(PostgresOperator):

    @apply_defaults
    def __init__(self, destination_table: str, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.destination_table: str = destination_table

    def execute(self, context: Dict[str, Any]):
        # create PostgresHook
        self.hook: PostgresHook = PostgresHook(postgres_conn_id=self.postgres_conn_id,
                                               schema=self.database)
        # read data from Postgres-SQL query into pandas DataFrame
        df: DataFrame = self.hook.get_pandas_df(sql=self.sql, parameters=self.parameters)
        # perform transformations on df here
        df['column_to_be_doubled'] = df['column_to_be_doubled'].multiply(2)
        ..
        # convert pandas DataFrame into list of tuples
        rows: List[Tuple[Any, ...]] = list(df.itertuples(index=False, name=None))
        # insert list of tuples in destination Postgres table
        self.hook.insert_rows(table=self.destination_table, rows=rows)

Примечание. Фрагмент приведен только для справки; он НЕ был протестирован

Ссылки

Дальнейшие модификации / улучшения

  • Параметр destination_table можно прочитать из Variable
  • Если таблица назначения не обязательно находится в той же схеме Postgres затем мы можем взять другой параметр, например destination_postgres_conn_id в __init__, и использовать его для создания destination_hook, для которого мы можем вызвать insert_rows метод
...