Как отключить столбцы в ряды в скрипте AWS Glue / Py Spark - PullRequest
0 голосов
/ 04 января 2019

У меня есть большой вложенный документ json для каждого года (например, 2018, 2017), в котором собраны данные по каждому месяцу (январь-декабрь) и каждому дню (1-31).

{
 "2018" : {
    "Jan": {
        "1": {
            "u": 1,
            "n": 2
        }
        "2": {
            "u": 4,
            "n": 7
        }
    },
    "Feb": {
        "1": {
            "u": 3,
            "n": 2
        },
        "4": {
            "u": 4,
            "n": 5
        }
    }
 }
}

Я использовал функцию AWS Glue Relationalize.apply для преобразования вышеуказанных иерархических данных в плоскую структуру:

dfc = Relationalize.apply (frame = datasource0, staging_path = my_temp_bucket, name = my_ref_relationalize_table ,formation_ctx = "dfc")

Что дает мне таблицу со столбцами каждого элемента json, как показано ниже:

| 2018.Jan.1.u | 2018.Jan.1.n | 2018.Jan.2.u | 2018.Jan.1.n | 2018.Feb.1.u | 2018.Feb.1.n | 2018.Feb.2.u | 2018.Feb.1.n | 
| 1            |      2       |      4       |      7       |      3       |      2       |      4       |      5       | 

Как видите, в таблице будет много столбцов для каждого дня и каждого месяца.И я хочу упростить таблицу, преобразовав столбцы в строки, которые будут иметь таблицу ниже.

| year | month | dd | u | n | 
| 2018 | Jan   | 1  | 1 | 2 | 
| 2018 | Jan   | 2  | 4 | 7 |  
| 2018 | Feb   | 1  | 3 | 2 |  
| 2018 | Jan   | 4  | 4 | 5 |

В результате поиска я не смог получить правильный ответ.Есть ли решение AWS Glue / PySpark или какой-либо другой способ выполнить функцию unpivot для получения таблицы на основе строк из таблицы на основе столбцов?Можно ли это сделать в Афине?

1 Ответ

0 голосов
/ 08 января 2019

Реализованное решение, аналогичное приведенному ниже фрагменту

dataFrame = datasource0.toDF()
tableDataArray = [] ## to hold rows
rowArrayCount = 0
for row in dataFrame.rdd.toLocalIterator():
    for colName in dataFrame.schema.names:
        value = row[colName]
        keyArray = colName.split('.')
        rowDataArray = []
        rowDataArray.insert(0,str(id))
        rowDataArray.insert(1,str(keyArray[0]))
        rowDataArray.insert(2,str(keyArray[1]))
        rowDataArray.insert(3,str(keyArray[2]))
        rowDataArray.insert(4,str(keyArray[3]))
        tableDataArray.insert(rowArrayCount,rowDataArray)
    rowArrayCount=+1

unpivotDF = None
for rowDataArray in tableDataArray:
    newRowDF = sc.parallelize([Row(year=rowDataArray[0],month=rowDataArray[1],dd=rowDataArray[2],u=rowDataArray[3],n=rowDataArray[4])]).toDF()
    if unpivotDF is None:
        unpivotDF = newRowDF
    else :
        unpivotDF = unpivotDF.union(newRowDF)

datasource0 = datasource0.fromDF(unpivotDF, glueContext, "datasource0")

в приведенном выше newRowDF, также может быть создано, как показано ниже, если необходимо принудительно применить тип данных

columns = [StructField('year',StringType(), True),StructField('month', IntegerType(), ....]
schema = StructType(columns)
unpivotDF = sqlContext.createDataFrame(sc.emptyRDD(), schema)
for rowDataArray in tableDataArray:
    newRowDF = spark.createDataFrame(rowDataArray, schema)
...