Как создать динамическую строку в Airflow - PullRequest
0 голосов
/ 04 октября 2018

У меня есть следующий рабочий процесс:

  1. Получить номер из MySqlOperator (динамический)
  2. Получить значение, хранящееся в переменной (статическая)
  3. Создать строку на основеоба.
  4. Используйте строку как команду sql для MySqlToGoogleCloudStorageOperator.

Теперь это оказалось трудным делом.

Это мой код:

VALUE_FROM_VARIABLE = Variable.get("my_var")

query = 'SELECT ... FROM orders where orders_id>{0}
          and orderid<{1};'.format(VALUE_FROM_MySqlOperator, VALUE_FROM_VARIABLE)


file_name   = ...
import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders_and_upload_to_storage',
    mysql_conn_id='mysql_con',
    google_cloud_storage_conn_id='gcp_con',
    sql=query,
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

Моя проблема в том, что я не могу получить доступ к MySqlOperator XCOM, в котором хранится номер, необходимый для запроса.

Поэтому я попытался получить к нему доступ в PythonOperator и построить строку запроса следующим образом:

def func(ds, **kwargs):
    ti = kwargs['ti']
    VALUE_FROM_MySqlOperator = str(ti.xcom_pull(task_ids='mySQL_task'))  # get the XCOM of MySqlOperator
    query = 'SELECT ... FROM orders where orders_id>{0}
              and orderid<{1};'.format(VALUE_FROM_MySqlOperator, VALUE_FROM_VARIABLE)
   return query


py_op = PythonOperator(
    task_id='py_op_task',
    provide_context=True,
    python_callable=func,
    xcom_push=True,
    dag=dag)

Но теперь я не могу передать новый сгенерированный запрос в MySqlToGoogleCloudStorageOperator, потому что не могу прочитатьXCOM внутри этого оператора.

Как мне выйти из этого?

1 Ответ

0 голосов
/ 04 октября 2018

Операторы SQL намереваются выполнять запросы, которые не возвращают никаких значений.Вы можете использовать такие операторы (например) для перемещения данных из рабочей таблицы в рабочую.

На мой взгляд, старайтесь избегать создания рабочих процессов, использующих XCOMS.

Если вам нужно запрашивать данныеиз базы данных вы можете использовать Крючки и Соединения

Непроверенный код ниже

VALUE_FROM_VARIABLE = Variable.get("my_var")
query_to_retrieve = "SELECT item FROM table"
from airflow.hooks.mysql_hook import MySqlHook
#here we importing hook, using connection and get first row
VALUE_FROM_MySQL = MySqlHook(mysql_conn_id='mysql_default').get_first(query_to_retrieve)[0]

query = 'SELECT ... FROM orders where orders_id>{0}
      and orderid<{1};'.format(VALUE_FROM_MySQL, VALUE_FROM_VARIABLE)
...