Склеить Job, чтобы объединить датафреймы с помощью pyspark? - PullRequest
0 голосов
/ 31 октября 2019

Я в основном пытаюсь обновить / добавить строки из одного DF в другой. Вот мой код:

# S3
import boto3

# SOURCE
source_table = "someDynamoDbtable"
source_s3 = "s://mybucket/folder/"

# DESTINATION
destination_bucket = "s3://destination-bucket"

#Select which attributes to update/add
params = ['attributeD', 'attributeF', 'AttributeG']

#spark wrapper
glueContext = GlueContext(SparkContext.getOrCreate())

newData = glueContext.create_dynamic_frame.from_options(connection_type = "dynamodb", connection_options = {"tableName": source_table})
newValues = newData.select_fields(params)
newDF = newValues.toDF()

oldData = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [source_s3]}, format="orc", format_options={}, transformation_ctx="dynamic_frame")
oldDataValues = oldData.drop_fields(params)
oldDF = oldDataValues.toDF()

#makes a union of the dataframes
rebuildData = oldDF.union(newDF)
#error happens here
readyData = DynamicFrame.fromDF(rebuildData, glueContext, "readyData")

#writes new data to s3 destination, into orc files, while partitioning
glueContext.write_dynamic_frame.from_options(frame = readyData, connection_type = "s3", connection_options = {"path": destination_bucket}, format = "orc", partitionBy=['partition_year', 'partition_month', 'partition_day'])

Я получаю ошибку: SyntaxError: неверный синтаксис в строке readyData = ... Пока что я понятия не имеючто не так и у меня болит голова от всего начинающего с пустого кода. Пожалуйста, если вы видели что-нибудь подобное, бросьте мне кость.

Ответы [ 2 ]

0 голосов
/ 01 ноября 2019

Да, поэтому я решил, что для того, что мне нужно сделать, лучше использовать OUTER JOIN. Позвольте мне объяснить:

  • Я загружаю два кадра данных, где один отбрасывает поля, которые мы хотим обновить.
  • Второй выбирает только эти поля, поэтому у обоих не будет дублирующихся строк / столбцов.
  • Вместо объединения, которое будет просто добавлять строки, мы используем внешнее (или полное) соединение. Это добавить все данные из моих фреймов данных без дубликатов.

Теперь моя логика может быть ошибочной, но пока она работает хорошо для меня. Если кто-то ищет подобное решение, добро пожаловать к нему. Мой измененный код:

rebuildData = oldDF.join(newData, 'id', 'outer')
0 голосов
/ 31 октября 2019

Вы выполняете операцию объединения между фреймом данных и динамическим фреймом.

Это создает динамический фрейм с именем newData и фрейм данных с именем newDF :

newData = glueContext.create_dynamic_frame.from_options(connection_type = "dynamodb", connection_options = {"tableName": source_table})
newValues = newData.select_fields(params)
newDF = newValues.toDF()

Это создает динамический кадр с именем oldData и информационный кадр с именем oldDF :

oldData = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [source_s3]}, format="orc", format_options={}, transformation_ctx="dynamic_frame")
oldDataValues = oldData.drop_fields(params)
oldDF = oldDataValues.toDF()

И вы выполняете операцию объединения над двумя объектами какниже:

rebuildData = oldDF.union(newData)

, который должен быть:

rebuildData = oldDF.union(newDF)
...