AWS Glue PySpark: разделение словаря в виде строки на несколько строк - PullRequest
1 голос
/ 05 апреля 2019

Я имею дело с большим набором данных, где мои записи имеют следующую форму

uniqueId col1 col2 col3  Levels
1    A1   A2   A3    {"2019-01-01":1.1 ,"2019-01-02":2.1 ,"2019-01-03":3.1}
2    B1   B2   B3    {"2019-01-01":1.2 ,"2019-01-03":3.2}
3    C1   C2   C3    {"2019-01-04":4.3}

'Уровни' хранятся в виде строкового типа.

Я пытаюсь разбить Levels на строки, чтобы получить вывод, подобный этому:

uniqueId col1 col2 col3 date        value
1        A1   A2   A3   2019-01-01  1.2
1        A1   A2   A3   2019-01-02  2.1
1        A1   A2   A3   2019-01-03  3.1
2        B1   B2   B3   2019-01-01  1.2
2        B1   B2   B3   2019-01-03  3.2
3        C1   C2   C3   2019-01-04  4.3

Я пытаюсь применить скрипт для AWS Glue в Pyspark, следуя предложенному здесь решению,

PySpark "взорвать" диктат в столбце

@udf("map<string, string>")
def parse(s):
    try:
        return json.loads(s)
    except json.JSONDecodeError:
        pass 

parse_udf = udf(parse) 



datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "database", table_name = "table", transformation_ctx = "datasource0")

sparkDF = datasource0.toDF() 

sparkDF2 = sparkDF.select("unique_id","col1","col2", "col3", explode(parse("levels")).alias("date", "value"))



GlueDF_tmp = DynamicFrame.fromDF(sparkDF2, glueContext, 'GlueDF_tmp')

GlueDF = GlueDF_tmp.apply_mapping([("unique_id", "string", "unique_id", "string"),
        ("col1", "string", "col1", "string"),
        ("col2", "string", "col2", "string"),
        ("col3", "string", "col3", "string"),
        ("date", "timestamp", "date", "timestamp"),
        ("value", "double", "value", "double")])


glueContext.write_dynamic_frame.from_options(frame = GlueDF, connection_type = "s3", 
     connection_options = {"path": "s3://..."}, 
     format = "parquet", 
     transformation_ctx = "datasink0")

Однако я сталкиваюсь с такими проблемами с памятью Клей AWS - невозможно установить spark.yarn.executor.memoryOverhead

Какой лучший / более эффективный способ сделать разделение?

...