Я загрузил файл паркета и создал фрейм данных, как показано ниже
----------------------------------------------------------------------
time | data1 | data2
-----------------------------------------------------------------------
1-40 | [ lion-> 34, bear -> 2 ] | [ monkey -> [9,23], goose -> [4,5] ]
Итак, тип данных столбца data1 - string->integer
map, где тип данных столбца data2 - string->array
map.
Я хочу разбить вышеуказанный фрейм данных на структуру ниже
------------------------
time | key | val
------------------------
1-40 | lion | 34
1-40 | bear | 2
1-40 | monkey_0 | 9
1-40 | monkey_1 | 23
1-40 | goose_0 | 4
1-40 | goose_1 | 5
Я пытался преобразовать data1 и data2 в тот же тип данных, что и string->array
, используя udfs в pyspark, а затем взорвал столбец, как показано ниже
def to_map(col1, col2):
for i in col1.keys():
col2[i] = [col1[i]]
return col2
caster= udf(to_map,MapType(StringType(),ArrayType(IntegerType())))
pm_df = pm_df.withColumn("animals", caster('data1', 'data2'))
pm_df.select('time',explode(col('animals')))
Я также пытался использовать hive sql, предполагая, что hive sql обладает большей производительностью, чем использование пользовательских функций pyspark.
rdd = spark.sparkContext.parallelize([[datetime.datetime.now(), {'lion': 34, 'bear': 2}, {'monkey': [9, 23], 'goose':[4,5]} ]])
df = rdd.toDF(fields)
df.createOrReplaceTempView("df")
df = spark.sql("select time, explode(data1), data2 from df")
df.createOrReplaceTempView("df")
df = spark.sql("select time,key as animal,value,posexplode(data2) from df").show(truncate=False)
Но я застрял с приведенным ниже результатом и не знаю, как объединить разделенные столбцы в соответствии с моим требованием. Выход вышеупомянутого куста sql:
+--------------------------+------+-----+---+------+-------+
|time |animal|value|pos|key |value |
+--------------------------+------+-----+---+------+-------+
|2019-06-12 19:23:00.169739|bear |2 |0 |goose |[4, 5] |
|2019-06-12 19:23:00.169739|bear |2 |1 |monkey|[9, 23]|
|2019-06-12 19:23:00.169739|lion |34 |0 |goose |[4, 5] |
|2019-06-12 19:23:00.169739|lion |34 |1 |monkey|[9, 23]|
+--------------------------+------+-----+---+------+-------+
Я знаю, что при использовании python udfs возникает много накладных расходов на связь между процессором python и JVM. Есть ли способ достичь ожидаемого результата с помощью встроенных функций или куста sql.