Мы получаем файл CSV с несколькими миллионами. записи.
Ряд полей в этих записях имеет значение, которое ссылается на внешний ключ в таблице БД.
В процессе импорта (в таблицу БД) мы должны изменить эти значения по идентификатору в главной таблице.
У нас это работает, но мы не думаем, что делаем это оптимальным образом, потому что это действительно медленно и потребляет много памяти.
Это наш первый опыт работы с PySpark. Так что мы открыты для любых предложений :) 1009 *
Прямо сейчас наш скрипт PySpark подключается ко всем нашим основным таблицам, и мы объединяем исходные данные (s3 csv), после чего мы вставляем результат в нашу таблицу БД.
Код aprox:
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
origin = glueContext.create_dynamic_frame.from_catalog(database="glue-test-mysql", table_name="origin_csv")
# get master table data
scenario = glueContext.create_dynamic_frame.from_catalog(database = "glue-test-mysql", table_name = "master_table")
# map fields to output table
origin = ApplyMapping.apply(frame = origin, mappings = [("col0", "string", "Field_3703_aux", "string"), ("col1", "string", "Field_3704_aux", "string"), ("col2", "string", "Field_3705_aux", "string"), ("col3", "string", "Field_3706_aux", "string"), ("col4", "string", "Field_3707_aux", "string"), ("col5", "string", "Field_3708_aux", "string"), ("col6", "string", "Field_3826_aux", "string"), ("col7", "double", "Field_3711", "double"), ("col8", "string", "Field_3712", "string")], transformation_ctx = "origin")
# we keep only our desired fields
scenario = scenario.drop_fields(['Field_3618', 'Field_3620', 'Field_3624', 'Field_3625', 'Field_3626', 'Field_3798', 'Field_3808', 'Field_4397_d_5b27944147d26', 'Field_4398'])
# join master table, and swap Value by ID
origin = Join.apply(origin, scenario, 'Field_3703_aux', 'Field_3619').drop_fields(['Field_3619', 'Field_3703_aux']).rename_field('ID', 'Field_3703')
# insert to our DB Table
datasink4 = glueContext.write_dynamic_frame.from_catalog(frame=origin, name_space='glue-test-mysql', table_name='output_table')
job.commit()
Есть ли лучший способ сделать эти "соединения"?
Эти мастер-таблицы небольшие, поэтому мы можем хранить их в памяти / кэшировать.