Запуск сценария SQL на стороне источника для нескольких таблиц в одном задании на клей с соответствующим соглашением об именах таблиц для S3 - PullRequest
0 голосов
/ 30 марта 2020
sql_list = ['(select * from table1 where rownum <= 100) alias1','(select * from table2 where rownum <= 100) alias2']

for sql_statement in sql_list: df = spark.read.format("jdbc").option("driver", jdbc_driver_name).option("url", db_url).option("dbtable", sql_statement).option("user", db_username).option("password", db_password).option("fetchSize", 100000).load()

df.write.format("parquet").mode("overwrite").save("s3://s3-location/" + sql_statement)

Источником был Oracle DB

. Мне удалось запустить массив запросов и сохранить его на S3 в паркетной памяти, но использовалось такое же наименование, как и в sql_list, Я хотел бы сохранить данные на S3 с именами как alias1 и alias2 соответственно.

1 Ответ

0 голосов
/ 01 апреля 2020

Подумайте об использовании словаря вместо списка, так как он более аккуратный и гибкий.

    sql_list = {'alias1':'(select * from table1 where rownum <= 100) alias1',
                'alias2': '(select * from table2 where rownum <= 100) alias2'}

    for table,sql_statement in sql_list.items():
        df = spark.read.format("jdbc").option("driver", jdbc_driver_name)\
            .option("url",db_url)\
            .option("dbtable", sql_statement)\
            .option("user", db_username)\
            .option("password", db_password)\
            .option("fetchSize",100000).load()

        df.write.format("parquet").mode("overwrite").save("s3://s3-location/" + table)

В противном случае вам понадобится выполнить грязное разбиение

df.write.format("parquet").mode("overwrite").save("s3://s3-location/" + sql_statement.split(' ')[-1])
...