Код итерации Python / Pyspark (для задания AWS Glue ETL) - PullRequest
0 голосов
/ 28 мая 2020

Я использую AWS Glue, и вы не можете читать / записывать несколько динамических c кадров без использования итерации. Я сделал этот код ниже, но борюсь с двумя вещами:

  1. Является ли «tableName», т.е. отфильтрованный список таблиц правильный (все таблицы, которые я хочу перебрать, начиная с client_historical_ *).
  2. Я застрял на том, как динамически заполнять имя таблицы Redshift, используя приведенное ниже сопоставление.

Сопоставления Redshift:

client_historical_ks --> table_01_a
client_historical_kg --> table_01_b
client_historical_kt --> table_01_c
client_historical_kf --> table_01_d

Код:

client = boto3.client('glue',region_name='us-east-1')

databaseName = 'incomingdata'
tables = client.get_tables(DatabaseName = databaseName)
tableList = tables['TableList']

for table in tableList:
    start_prefix = client_historical_
    tableName = list(filter(lambda x: x.startswith(start_prefix), table['Name']))
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "incomingdata", table_name = tableName, transformation_ctx = "datasource0")
    datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "Redshift", connection_options = {"dbtable": "nameoftablehere", "database": "metadata"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")

1 Ответ

0 голосов
/ 29 мая 2020

Вы можете создать словарь сопоставления, а затем выполнить свой код. Вы также можете отфильтровать таблицы за пределами l oop, а затем l oop только по требуемым таблицам.

mapping = {'client_historical_ks': 'table_01_a',
'client_historical_kg': 'table_01_b',
'client_historical_kt': 'table_01_c',
'client_historical_kf': 'table_01_d'}

client = boto3.client('glue',region_name='us-east-1')

databaseName = 'incomingdata'
tables = client.get_tables(DatabaseName = databaseName)
tableList = tables['TableList']
start_prefix = 'client_historical_'
tableNames = list(filter(lambda x: x.startswith(start_prefix), table['Name']))

for table in tableNames:
    target_table = mapping.get(table)
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "incomingdata", table_name = table, transformation_ctx = "datasource0")
    datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "Redshift", connection_options = {"dbtable": target_table, "database": "metadata"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")

...