Объединить две колонки, но с разной структурой в улье - PullRequest
1 голос
/ 13 июня 2019

Я загрузил файл паркета и создал фрейм данных, как показано ниже

----------------------------------------------------------------------
 time |  data1                | data2
-----------------------------------------------------------------------
1-40  | [ lion-> 34, bear -> 2 ] |  [ monkey -> [9,23], goose -> [4,5] ]

Итак, тип данных столбца data1 - string->integer map, где тип данных столбца data2 - string->array map.

Я хочу разбить вышеуказанный фрейм данных на структуру ниже

------------------------
time | key        | val
------------------------
1-40 | lion       | 34
1-40 | bear       | 2
1-40 | monkey_0   | 9
1-40 | monkey_1   | 23
1-40 | goose_0    | 4
1-40 | goose_1    | 5

Я пытался преобразовать data1 и data2 в тот же тип данных, что и string->array, используя udfs в pyspark, а затем взорвал столбец, как показано ниже

def to_map(col1, col2):
    for i in col1.keys():
        col2[i] = [col1[i]]
    return col2
caster= udf(to_map,MapType(StringType(),ArrayType(IntegerType())))
pm_df = pm_df.withColumn("animals", caster('data1', 'data2'))
pm_df.select('time',explode(col('animals')))

Я также пытался использовать hive sql, предполагая, что hive sql обладает большей производительностью, чем использование пользовательских функций pyspark.

rdd = spark.sparkContext.parallelize([[datetime.datetime.now(), {'lion': 34, 'bear': 2}, {'monkey': [9, 23], 'goose':[4,5]} ]])
df = rdd.toDF(fields)
df.createOrReplaceTempView("df")
df = spark.sql("select time, explode(data1), data2 from df")
df.createOrReplaceTempView("df")
df = spark.sql("select time,key as animal,value,posexplode(data2) from df").show(truncate=False)

Но я застрял с приведенным ниже результатом и не знаю, как объединить разделенные столбцы в соответствии с моим требованием. Выход вышеупомянутого куста sql:

+--------------------------+------+-----+---+------+-------+
|time                      |animal|value|pos|key   |value  |
+--------------------------+------+-----+---+------+-------+
|2019-06-12 19:23:00.169739|bear  |2    |0  |goose |[4, 5] |
|2019-06-12 19:23:00.169739|bear  |2    |1  |monkey|[9, 23]|
|2019-06-12 19:23:00.169739|lion  |34   |0  |goose |[4, 5] |
|2019-06-12 19:23:00.169739|lion  |34   |1  |monkey|[9, 23]|
+--------------------------+------+-----+---+------+-------+

Я знаю, что при использовании python udfs возникает много накладных расходов на связь между процессором python и JVM. Есть ли способ достичь ожидаемого результата с помощью встроенных функций или куста sql.

1 Ответ

2 голосов
/ 13 июня 2019

Я бы обработал data1 и data2 отдельно, а затем union resultset:

from pyspark.sql import functions as F

df1 = df.select('time', F.explode('data1').alias('key', 'value'))
>>> df1.show()
#+--------------------+----+-----+
#|                time| key|value|
#+--------------------+----+-----+
#|2019-06-12 20:19:...|bear|    2|
#|2019-06-12 20:19:...|lion|   34|
#+--------------------+----+-----+

df2 = df.select('time', F.explode('data2').alias('key', 'values')) \
        .select('time', 'key', F.posexplode('values').alias('pos','value')) \
        .select('time', F.concat('key', F.lit('_'), 'pos').alias('key'), 'value')
>>> df2.show()
#+--------------------+--------+-----+
#|                time|     key|value|
#+--------------------+--------+-----+
#|2019-06-12 20:19:...| goose_0|    4|
#|2019-06-12 20:19:...| goose_1|    5|
#|2019-06-12 20:19:...|monkey_0|    9|
#|2019-06-12 20:19:...|monkey_1|   23|
#+--------------------+--------+-----+

df_new = df1.union(df2)
...