Как объединить два списка по позиции в фрейме данных с помощью PySpark - PullRequest
1 голос
/ 17 июня 2020

У меня есть фрейм данных, как показано ниже.

Текущий фрейм данных

+---+--------+---------=+
| id|size    |variantID |
+---+----+---+----------+
|  1| [10,20]| [150,160]|
|  2| [2]    | [1]      |
|  3| []     |   []     |
+---+--------+----------+

Я хочу добавить новый столбец, объединив массив размера и массив идентификаторов вариантов по позиции с этим символом (|). Отсюда мне нужен новый массив с именем sizeMap. Количество элементов в поле Size такое же, как в столбце optionID.

Ожидаемый результат:

+---+--------+---------------------------+
| id|size    |variantID |sizeMap         |
+---+----+---+---------------------------+
|  1| [10,20]| [150,160]|[10|150, 20|160]|
|  2| [2]    | [1]      |  [2|1]         |
|  3| []     |   []     |   []           |
+---+--------+----------------------------+

Можете ли вы помочь мне решить эту проблему ...!

Ответы [ 2 ]

2 голосов
/ 17 июня 2020

Возможно, это полезно (записано в scala, но может использоваться в pyspark с минимальными изменениями)

Загрузить предоставленные тестовые данные

  val df =
      spark.sql(
        """
          |select id, size, variantID from values
          | (1, array(10, 20), array(150, 160)),
          | (2, array(2), array(1)),
          | (3, array(null), array(null))
          | T(id, size, variantID)
        """.stripMargin)
    df.show(false)
    df.printSchema()
    /**
      * +---+--------+----------+
      * |id |size    |variantID |
      * +---+--------+----------+
      * |1  |[10, 20]|[150, 160]|
      * |2  |[2]     |[1]       |
      * |3  |[]      |[]        |
      * +---+--------+----------+
      *
      * root
      * |-- id: integer (nullable = false)
      * |-- size: array (nullable = false)
      * |    |-- element: integer (containsNull = true)
      * |-- variantID: array (nullable = false)
      * |    |-- element: integer (containsNull = true)
      */

zip 2 массив по позиции ( Без UDF)

    val p = df.withColumn("sizeMap", arrays_zip($"size", $"variantID"))
      .withColumn("sizeMap", expr("TRANSFORM(sizeMap, x -> concat_ws('|', x.size, x.variantID))"))
    p.show(false)
    p.printSchema()

    /**
      * +---+--------+----------+----------------+
      * |id |size    |variantID |sizeMap         |
      * +---+--------+----------+----------------+
      * |1  |[10, 20]|[150, 160]|[10|150, 20|160]|
      * |2  |[2]     |[1]       |[2|1]           |
      * |3  |[]      |[]        |[]              |
      * +---+--------+----------+----------------+
      *
      * root
      * |-- id: integer (nullable = false)
      * |-- size: array (nullable = false)
      * |    |-- element: integer (containsNull = true)
      * |-- variantID: array (nullable = false)
      * |    |-- element: integer (containsNull = true)
      * |-- sizeMap: array (nullable = false)
      * |    |-- element: string (containsNull = false)
      */
0 голосов
/ 17 июня 2020

У меня есть решение, которое будет работать. Но может быть медленным для больших данных из-за UDF. Также последний столбец будет строкой, поскольку в нем есть строковый вертикальный символ '|'.

from pyspark.sql.functions import *
from pyspark.sql.types import *
values = [(1,[10,20], [150,160]), 
          (2,[2], [2|1]  ), 
          (3,[], [])]
rdd = sc.parallelize(values)
schema = StructType([StructField("id", IntegerType(), True),
                     StructField("size", ArrayType(IntegerType()), True),
                    StructField("variantID", ArrayType(IntegerType()), True)])
data = spark.createDataFrame(rdd, schema)
data.show()
"""
+---+--------+----------+
| id|    size| variantID|
+---+--------+----------+
|  1|[10, 20]|[150, 160]|
|  2|     [2]|       [3]|
|  3|      []|        []|
+---+--------+----------+
"""
def arrangeAsReuired(inputString) :
  inputString = inputString.replace("[","").replace("]","")
  if inputString.strip() in "[]&[]" :
    sizeMapPopulated = "[]"  
  else :
    firstArray = inputString.split("&")[0].split(",")
    secondArray = inputString.split("&")[1].split(",")
    sizeMapPopulated = [str(firstArray[x]) + "|" + str(secondArray[x]) for x in range(0, len(firstArray), 1)]
  return str(sizeMapPopulated)
udfToReturnData = udf(lambda z: arrangeAsReuired(z), StringType())
spark.udf.register("udfToReturnData", udfToReturnData)

data = data.withColumn("sizeMap", \
                       udfToReturnData(concat(col("size").cast("string"), lit("&"), col("variantID").cast("string")).cast("string"))) \
           .select("id","size","sizeMap")
data.show(20,False)
"""
+---+--------+----------------------+
|id |size    |sizeMap               |
+---+--------+----------------------+
|1  |[10, 20]|['10|150', ' 20| 160']|
|2  |[2]     |['2|3']               |
|3  |[]      |[]                    |
+---+--------+----------------------+
"""
...