Как изменить дочерний массив DynamicFrame в потоке карты PySpark - PullRequest
0 голосов
/ 11 июля 2019

У меня есть структура DynamicFrame, где наиболее сложным аспектом является ключ types, который указывает на array из structs. По какой-то причине я не могу Map для каждой записи и изменять ключ types. Как этот тип операции должен выполняться в AWS Glue и PySpark?

Когда я пишу Map функцию для преобразования types и записываю мой DynamicFrame в JSON, вывод всегда имеет ключ types в виде пустого массива, даже если я связываю его, делаю новый ключ, такой как blah_types (см. ниже).

Пример отдельной записи:

{
  "id": "1",
  "name": "rickroll",
  "types": [
    {"type": "basic", "id": "2"},
    {"type": "advanced", "id": "3"}
  ]
}
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "example", table_name = "example", transformation_ctx = "datasource0")

def MapRecord(rec):
    new_types = [t["id"] for t in rec["types"]]
    rec["types"] = new_types
    # or even
    rec["blah_types"] = new_types
    return rec

df_with_flat_types = Map.apply(frame = datasource0, f = MapRecord)

applymapping1 = ApplyMapping.apply(frame = df_with_flat_types, mappings = [("id", "string", "id", "string"),("types", "array", "types", "array") ], transformation_ctx = "applymapping1")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://my_bucket/output"}, format = "json", transformation_ctx = "datasink2")
job.commit()

Я бы, по крайней мере, ожидал, что мои выходные записи будут иметь ключ blah_types со всеми подструктурами, которые изначально имел types.

...