Я сомневаюсь, что для этого есть встроенный оператор. Вы можете легко написать пользовательский оператор
- Расширить
PostgresOperator
или просто BaseOperator
/ любой другой оператор по вашему выбору. Весь пользовательский код переходит в переопределенный execute()
метод - Затем используйте
PostgresHook
, чтобы получить Pandas
DataFrame
вызывая get_pandas_df()
функцию - Выполните любые преобразования, которые вам нужно сделать, в вашей
pandas
df
- Наконец используйте
insert_rows()
функцию для вставки данных в таблицу
UPDATE-1
По запросу я добавляю код для оператора
from typing import Dict, Any, List, Tuple
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.decorators import apply_defaults
from pandas import DataFrame
class MyCustomOperator(PostgresOperator):
@apply_defaults
def __init__(self, destination_table: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.destination_table: str = destination_table
def execute(self, context: Dict[str, Any]):
# create PostgresHook
self.hook: PostgresHook = PostgresHook(postgres_conn_id=self.postgres_conn_id,
schema=self.database)
# read data from Postgres-SQL query into pandas DataFrame
df: DataFrame = self.hook.get_pandas_df(sql=self.sql, parameters=self.parameters)
# perform transformations on df here
df['column_to_be_doubled'] = df['column_to_be_doubled'].multiply(2)
..
# convert pandas DataFrame into list of tuples
rows: List[Tuple[Any, ...]] = list(df.itertuples(index=False, name=None))
# insert list of tuples in destination Postgres table
self.hook.insert_rows(table=self.destination_table, rows=rows)
Примечание. Фрагмент приведен только для справки; он НЕ был протестирован
Ссылки
Дальнейшие модификации / улучшения
- Параметр
destination_table
можно прочитать из Variable
- Если таблица назначения не обязательно находится в той же схеме
Postgres
затем мы можем взять другой параметр, например destination_postgres_conn_id
в __init__
, и использовать его для создания destination_hook
, для которого мы можем вызвать insert_rows
метод