Измените значение на ID из основной таблицы - PullRequest
0 голосов
/ 21 января 2019

Мы получаем файл CSV с несколькими миллионами. записи.

Ряд полей в этих записях имеет значение, которое ссылается на внешний ключ в таблице БД.

В процессе импорта (в таблицу БД) мы должны изменить эти значения по идентификатору в главной таблице.

У нас это работает, но мы не думаем, что делаем это оптимальным образом, потому что это действительно медленно и потребляет много памяти.

Это наш первый опыт работы с PySpark. Так что мы открыты для любых предложений :) 1009 *

Прямо сейчас наш скрипт PySpark подключается ко всем нашим основным таблицам, и мы объединяем исходные данные (s3 csv), после чего мы вставляем результат в нашу таблицу БД.

Код aprox:

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

origin = glueContext.create_dynamic_frame.from_catalog(database="glue-test-mysql", table_name="origin_csv")
# get master table data
scenario = glueContext.create_dynamic_frame.from_catalog(database = "glue-test-mysql", table_name = "master_table")

# map fields to output table
origin = ApplyMapping.apply(frame = origin, mappings = [("col0", "string", "Field_3703_aux", "string"), ("col1", "string", "Field_3704_aux", "string"), ("col2", "string", "Field_3705_aux", "string"), ("col3", "string", "Field_3706_aux", "string"), ("col4", "string", "Field_3707_aux", "string"), ("col5", "string", "Field_3708_aux", "string"), ("col6", "string", "Field_3826_aux", "string"), ("col7", "double", "Field_3711", "double"), ("col8", "string", "Field_3712", "string")], transformation_ctx = "origin")

# we keep only our desired fields
scenario = scenario.drop_fields(['Field_3618', 'Field_3620', 'Field_3624', 'Field_3625', 'Field_3626', 'Field_3798', 'Field_3808', 'Field_4397_d_5b27944147d26', 'Field_4398'])

# join master table, and swap Value by ID
origin = Join.apply(origin, scenario, 'Field_3703_aux', 'Field_3619').drop_fields(['Field_3619', 'Field_3703_aux']).rename_field('ID', 'Field_3703')

# insert to our DB Table
datasink4 = glueContext.write_dynamic_frame.from_catalog(frame=origin, name_space='glue-test-mysql', table_name='output_table')

job.commit()

Есть ли лучший способ сделать эти "соединения"? Эти мастер-таблицы небольшие, поэтому мы можем хранить их в памяти / кэшировать.

...