У меня есть список таблиц, через которые я хочу запустить свой скрипт. Он работает успешно, когда я делаю по одной таблице за раз, но когда я пытаюсь на 1 oop выше задач, он запускает все таблицы одновременно, выдавая мне несколько ошибок.
Вот мой код:
def create_tunnel_postgres():
psql_host = ''
psql_port = 5432
ssh_host= ''
ssh_port = 22
ssh_username = ''
pkf = paramiko.RSAKey.from_private_key(StringIO(Variable.get('my_key')))
server = SSHTunnelForwarder(
(ssh_host, 22),
ssh_username=ssh_username,
ssh_private_key=pkf,
remote_bind_address=(psql_host, 5432))
return server
def conn_postgres_internal(server):
"""
Using the server connect to the internal postgres
"""
conn = psycopg2.connect(
database='pricing',
user= Variable.get('postgres_db_user'),
password= Variable.get('postgres_db_key'),
host=server.local_bind_host,
port=server.local_bind_port,
)
return conn
def gzip_postgres_table(**kwargs):
"""
path='/path/{}.csv'.format(table_name)
server_postgres = create_tunnel_postgres()
server_postgres.start()
etl_conn = conn_postgres_internal(server_postgres)
cur=etl_conn.cursor()
cur.execute("""
select * from schema.db.{} limit 100;
""".format(table_name))
result = cur.fetchall()
column_names = [i[0] for i in cur.description]
fp = gzip.open(path, 'wt')
myFile = csv.writer(fp,delimiter=',')
myFile.writerow(column_names)
myFile.writerows(result)
fp.close()
etl_conn.close()
server_postgres.stop()
#------------------------------------------------------------------------------------------------------------------------------------------------
default_args = {
'owner': 'mae',
'depends_on_past':False,
'start_date': datetime(2020,1,1),
'email': ['maom@aol.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1)
}
tables= ['table1','table2']
s3_folder='de'
current_timestamp=datetime.now()
#Element'S VARIABLES
dag = DAG('dag1',
description = 'O',
default_args=default_args,
max_active_runs=1,
schedule_interval= '@once',
#schedule_interval='hourly'
catchup = False )
for table_name in pricing_table_name:
t1 = PythonOperator(
task_id='{}_gzip_table'.format(table_name),
python_callable= gzip_postgres_table,
provide_context=True,
op_kwargs={'table_name':table_name,'s3_folder':s3_folder,'current_timestamp':current_timestamp},
dag = dag)
Есть ли способ сначала запустить table1 ... пусть он завершится sh, а затем запустить таблицу 2? Я попытался сделать это с помощью for table_name в таблицах: но безрезультатно. Любые идеи или предложения помогут.