Хотите получить массив из массива словарей в pyspark эффективно - PullRequest
1 голос
/ 03 марта 2020

У меня есть кадр данных, как показано ниже:

   +----------+----------+---------------------+
   | Index    |           flagArray            |
   +----------+----------+---------------------+
   |    1     | [{start :1 , end :2, flag :A}, | 
   |          | {start :3 , end :5, flag :A}]  |
   +----------+--------------------------------+
   |    2     | [{start :1 , end :5, flag :A}] |
   +--------- +----------+---------------------+
     root
      |-- index: integer (nullable = true)
      |-- flagArray: array (nullable = true)
      |    |-- element: struct (containsNull = false)
      |    |    |-- flag: string (nullable = true)
      |    |    |-- start: integer (nullable = true)
      |    |    |-- end: integer (nullable = true)

Я хочу получить новый столбец Flag на основе полей начала, конца и флага flagArray.

   +----------+--------------------------------+------------+
   | Index    |           flagArray            |    flag2   |
   +----------+--------------------------------+------------+
   |    1     | [{start :1 , end :2, flag :A}, | [A,A,S,S,S]|
   |          | {start :3 , end :5, flag :A}]  |            |
   +----------+--------------------------------+------------+ 
   |    2     | [{start :1 , end :5, flag :A}] | [A,A,A,A,A]|
   +--------- +--------------------------------+------------+

У меня есть рабочий код, но я хотел сделать это более эффективным способом, так как у меня будет миллионы строк, а у меня будет 300 элементов массива в столбце Флаг:

     @udf(ArrayType(StringType()))
     def set_flag(startIndex,endIndex, flag):
         arraylength = len(startIndex)
         row = []
         for i in range(0,arraylength):
             start = int(startIndex[i])
             end = int(endIndex[i]) + 1
            derFlag = flag[i]
            for i in range(start,endIndex+1):
                row.append(derFlag)
         return row
     df = df.select("*","flagArray.start","flagArray.end","flagArray.flag")
     df = df.withColumn("flag2",set_flag(df.start,df.end,df.flag)).drop("start","end","flag")
...