Проблема с родительской дочерней иерархией pyspark dataframe - PullRequest
1 голос
/ 10 июля 2020

В продолжение проблемы: pyspark dataframe withColumn Команда не работает

У меня есть входной фрейм данных: df_input (обновленный df_input)

|comment|inp_col|inp_val|
|11     |a      |a1     |
|12     |a      |a2     |
|12     |f      |&a     |
|12     |a      |f9     |
|15     |b      |b3     |
|16     |b      |b4     |
|17     |c      |&b     |
|17     |c      |c5     |
|17     |d      |&c     |
|17     |d      |d6     |
|17     |e      |&d     |
|17     |e      |e7     |

Если вы видите inp_col и inp_val имеют иерархию, и это может быть число n со значением root. Здесь родительские значения: "b" и "a" .

Теперь, согласно моему требованию, я должен заменить дочерние значения, начинающиеся с " & " к соответствующим значениям. Я пробовал перебирать список значений, начиная со значений '&' в столбце inp_val и заменяя их списком значений на каждой итерации. Но это не сработало. Я столкнулся с проблемой, как получить список со значениями родительского и дочернего списков.

проверенный код:

list_1 = [row['inp_val'] for row in tst.select(tst.inp_val).where(tst.inp_val.substr(0, 1) == '&').collect()]
# removing the '&' at every starting of the list values
list_2 = [list_val[1:] for list_val in list_1]
tst_1 = tst.withColumn("val_extract", when(tst.inp_val.substr(0, 1) == '&', regexp(tst.inp_val, "&", "")).otherwise(tst.inp_val))
for val in list_2:
   df_leaf = tst_1.select(tst_1.val_extract).where(tst_1.inp_col == val)
   list_3 = [row['val_extract'] for row in df_leaf.collect()]

   tst_1 = tst_1.withColumn('bool', when(tst_1.val_extract == val, 'True').otherwise('False'))
   tst_1 = tst_1.withColumn('val_extract', when(tst_1.bool == 'True', str(list_3)).otherwise(tst_1.val_extract)).drop('bool')

Обновленный ожидаемый результат:

|comment|inp_col|inp_val|inp_extract                  |
|11     |a      |a1     |['a1']                       |
|12     |a      |a2     |['a2']                       |
|12     |f      |&a     |['a1, 'a2']                  |
|12     |f      |f9     |['f9']                       |
|15     |b      |b3     |['b3']                       |
|16     |b      |b4     |['b4']                       |
|17     |c      |&b     |['b3', 'b4']                 |
|18     |c      |c5     |['c5']                       |
|19     |d      |&c     |['b3', 'b4', 'c5']           |
|20     |d      |d6     |['d6']                       |
|21     |e      |&d     |['b3', 'b4', 'c5', 'd6']     |
|22     |e      |e7     |['e7']                       |

После этого я могу попробовать взорваться, чтобы получить несколько строк. Но результат aove - это то, что нам нужно, и мы не можем получить определенный процентный результат.

Ответы [ 2 ]

2 голосов
/ 11 июля 2020

Если вы действительно хотите избежать использования графиков и ваш случай не сложнее, чем показано выше, попробуйте следующее.

from pyspark.sql import functions as F

df.show() #sampledataframe

#+-------+---------+---------+
#|comment|input_col|input_val|
#+-------+---------+---------+
#|     11|        a|       a1|
#|     12|        a|       a2|
#|     12|        f|       &a|
#|     12|        f|       f9|
#|     15|        b|       b3|
#|     16|        b|       b4|
#|     17|        c|       &b|
#|     17|        c|       c5|
#|     17|        d|       &c|
#|     17|        d|       d6|
#|     17|        e|       &d|
#|     17|        e|       e7|
#+-------+---------+---------+

df1=df.join(df.groupBy("input_col").agg(F.collect_list("input_val").alias("y1"))\
          .withColumnRenamed("input_col","x1"),F.expr("""input_val rlike x1"""),'left')\
  .withColumn("new_col", F.when(F.expr("""substring(input_val,0,1)!""")!=F.lit('&'), F.array("input_val"))\
                    .otherwise(F.col("y1"))).drop("x1","y1")

df2=df1.join(df1.selectExpr("input_val as input_val1","new_col as new_col1"), F.expr("""array_contains(new_col,input_val1) and\
           substring(input_val1,0,1)=='&'"""),'left')


df2.join(df2.selectExpr("input_val1 as val2","new_col1 as col2")\
         .dropna(),F.expr("""array_contains(new_col1,val2)"""),'left')\
  .withColumn("inp_extract", F.when(F.expr("""substring(input_val,0,1)!='&'"""), F.col("new_col"))\
                        .otherwise(F.expr("""filter(concat(\
                        coalesce(new_col,array()),\
                        coalesce(new_col1,array()),\
                        coalesce(col2, array()))\
                        ,x-> x is not null and substring(x,0,1)!='&')""")))\

  .select("comment","input_col","input_val",F.array_sort("inp_extract").alias("inp_extract")).show()

#+-------+---------+---------+----------------+
#|comment|input_col|input_val|     inp_extract|
#+-------+---------+---------+----------------+
#|     11|        a|       a1|            [a1]|
#|     12|        a|       a2|            [a2]|
#|     12|        f|       &a|        [a1, a2]|
#|     12|        f|       f9|            [f9]|
#|     15|        b|       b3|            [b3]|
#|     16|        b|       b4|            [b4]|
#|     17|        c|       &b|        [b3, b4]|
#|     17|        c|       c5|            [c5]|
#|     17|        d|       &c|    [b3, b4, c5]|
#|     17|        d|       d6|            [d6]|
#|     17|        e|       &d|[b3, b4, c5, d6]|
#|     17|        e|       e7|            [e7]|
#+-------+---------+---------+----------------+
0 голосов
/ 10 июля 2020

Вы можете присоединить фрейм данных к самому себе, чтобы получить это.

input : 
df.show()

+-------+-------+---------+
|comment|inp_col|input_val|
+-------+-------+---------+
|     11|      a|       a1|
|     12|      a|       a2|
|     13|      f|       &a|
|     14|      b|       b3|
|     15|      b|       b4|
|     16|      d|       &b|
+-------+-------+---------+

import pyspark.sql.functions as F


df.createOrReplaceTempView("df1")
df.withColumn("input_val", F.regexp_replace(F.col("input_val"), "&", "")).createOrReplaceTempView("df2")

spark.sql("""select * from (select coalesce(df2.comment,df1.comment) as comment , 
coalesce(df2.inp_col,df1.inp_col) as inp_col,
 coalesce(df2.input_val,df2.input_val) as input_val ,
 case when df1.input_val is not null then df1.input_val else df2.input_val end  as output
 from df1  full outer join df2 on df2.input_val = df1.inp_col) where input_val is not null order by comment  """).show()
Output
+-------+-------+---------+------+
|comment|inp_col|input_val|output|
+-------+-------+---------+------+
|     11|      a|       a1|    a1|
|     12|      a|       a2|    a2|
|     13|      f|        a|    a1|
|     13|      f|        a|    a2|
|     14|      b|       b3|    b3|
|     15|      b|       b4|    b4|
|     16|      d|        b|    b3|
|     16|      d|        b|    b4|
+-------+-------+---------+------+
...