Как индивидуально запустить задачу отдельно в потоке воздуха? - PullRequest
0 голосов
/ 11 февраля 2020

У меня есть список таблиц, через которые я хочу запустить свой скрипт. Он работает успешно, когда я делаю по одной таблице за раз, но когда я пытаюсь на 1 oop выше задач, он запускает все таблицы одновременно, выдавая мне несколько ошибок.

Вот мой код:

def create_tunnel_postgres():

    psql_host = ''
    psql_port = 5432
    ssh_host= ''
    ssh_port = 22
    ssh_username = ''
    pkf = paramiko.RSAKey.from_private_key(StringIO(Variable.get('my_key')))

    server = SSHTunnelForwarder(
        (ssh_host, 22),
        ssh_username=ssh_username,
        ssh_private_key=pkf,
        remote_bind_address=(psql_host, 5432))

    return server

def conn_postgres_internal(server):
    """
    Using the server connect to the internal postgres
    """
    conn = psycopg2.connect(
        database='pricing',
        user= Variable.get('postgres_db_user'),
        password= Variable.get('postgres_db_key'),
        host=server.local_bind_host,
        port=server.local_bind_port,
    )

    return conn

def gzip_postgres_table(**kwargs):
    """

    path='/path/{}.csv'.format(table_name)
    server_postgres = create_tunnel_postgres()
    server_postgres.start()
    etl_conn = conn_postgres_internal(server_postgres)
    cur=etl_conn.cursor()
    cur.execute("""
        select * from schema.db.{} limit 100;
        """.format(table_name))
    result = cur.fetchall()
    column_names = [i[0] for i in cur.description]
    fp = gzip.open(path, 'wt')
    myFile = csv.writer(fp,delimiter=',')
    myFile.writerow(column_names)
    myFile.writerows(result)
    fp.close()
    etl_conn.close()
    server_postgres.stop()


#------------------------------------------------------------------------------------------------------------------------------------------------

default_args = {
    'owner': 'mae',
    'depends_on_past':False,
    'start_date': datetime(2020,1,1),
    'email': ['maom@aol.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1)
}


tables= ['table1','table2']
s3_folder='de'
current_timestamp=datetime.now()



#Element'S VARIABLES

dag = DAG('dag1',
          description = 'O',
          default_args=default_args,
          max_active_runs=1,
          schedule_interval= '@once',
          #schedule_interval='hourly'
          catchup = False )


for table_name in pricing_table_name:
    t1 = PythonOperator(
        task_id='{}_gzip_table'.format(table_name),
        python_callable= gzip_postgres_table,
        provide_context=True,
        op_kwargs={'table_name':table_name,'s3_folder':s3_folder,'current_timestamp':current_timestamp},
        dag = dag)

Есть ли способ сначала запустить table1 ... пусть он завершится sh, а затем запустить таблицу 2? Я попытался сделать это с помощью for table_name в таблицах: но безрезультатно. Любые идеи или предложения помогут.

Ответы [ 4 ]

1 голос
/ 13 февраля 2020

Мне нужно, чтобы DAG вот так enter image description here

Код для этого:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

import sys
sys.path.append('../')
from mssql_loader import core #program code, which start load 
from mssql_loader import locals #local variables, contains dictionaries with name
def contact_load(typ,db):

    core.starter(typ=typ,db=db)
    return 'MSSQL LOADED '+db['DBpseudo']+'.'+typ

dag = DAG('contact_loader', description='MSSQL sqlcontact.uka.local loader to GBQ',
          schedule_interval='0 7 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)

start_operator = DummyOperator(task_id='ROBO_task', retries=3, dag=dag)


for v in locals.TABLES:
    for db in locals.DB:        
        task = PythonOperator(
            task_id=db['DBpseudo']+'_mssql_' + v, #create Express_mssql_fast , UKA_mssql_important and etc
            python_callable=contact_load,
            op_kwargs={'typ': v,'db':db},
            retries=3,
            dag=dag,
        )

        start_operator >> task #create parent-child connection to from first task to other
1 голос
/ 13 февраля 2020

Я видел ваш код, и кажется, что вы создаете несколько задач DAG с помощью оператора зацикливания, который запускает задачу параллельно.

Существуют определенные способы удовлетворения ваших требований.

  1. использовать sequential_executor.

airflow.executors.sequential_executor.SequentialExecutor which will only run task instances sequentially.

https://airflow.apache.org/docs/stable/start.html#quick -старт

создайте скрипт, который работает в соответствии с вашими потребностями.

Создайте скрипт (Python) и используйте его в качестве PythonOperator, который повторяет вашу текущую функцию для числа таблиц.

ограничить исполнителей воздушного потока (параллелизм) до 1.

Вы можете ограничить своих рабочих воздушных потоков до 1 в файле конфигурации airflow.cfg.

Шаги :

открыть airflow.cfg из вашего воздушного потока root (AIRFLOW_HOME).

установить / обновить parallelism = 1

перезапустить ваш воздушный поток.

это должно работать.

1 голос
/ 13 февраля 2020

Я вижу 3 способа решения этой проблемы.

1 голос
/ 11 февраля 2020

Ваш for создает несколько задач для обработки ваших таблиц, это будет распараллеливать выполнение задач по умолчанию в потоке воздуха.

Вы можете установить число рабочих в файле конфигурации воздушного потока на 1 или создать только 1 задачу и запустить l oop внутри задачи, которая затем будет выполняться синхронно .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...