Как перебрать столбец со списком списков в качестве значений и создать новый столбец - PullRequest
0 голосов
/ 09 июля 2020
• 1000 внутри agg_data фактически находятся объекты "Row", которые были сгруппированы на основе column1 и column2, а затем были объединены в один столбец.

Мне нужно перебрать значения в agg_data, получить "столбец 7" данные из списка всех сохраненных строк, объедините их и добавьте новый столбец во фрейм данных. Что-то вроде этого.

+---------+---------+-----------------------+---------+------------+
| column1 | column2 |       agg_data        | column4 | agg_values |
+---------+---------+-----------------------+---------+------------+
| a       | b       | [[1,2,3,4],[4,5,6,7]] | c       |        3,6 |
| x       | y       | [[1,2,3,4],[4,5,6,7]] | z       |        3,6 |
+---------+---------+-----------------------+---------+------------+

Я новичок в scala, поэтому понятия не имею, как к этому подойти. Тем не менее, я попробовал несколько предложений из Stack Overflow, подобных этому ответ здесь. Но это не сработало, как ожидалось. Он включил все значения в таблице в одну строку.

Ответы [ 4 ]

1 голос
/ 09 июля 2020

Если вы используете Spark 3.0 , вы можете использовать функцию преобразования, как показано ниже

df.withColumn("agg_values", transform($"column3", arr => element_at(arr, -2)))

FOR Spark2.4 +

df.withColumn("agg_values", expr("transform(column3, x -> element_at(x, -2) )"))

Если вы хотите преобразовать добавленный новый массив в строку, вы можете использовать concat_ws

Вывод:

+-------+-------+------------------------------------------+-------+---------+
|column1|column2|column3                                   |column4|agg_values|
+-------+-------+------------------------------------------+-------+---------+
|a      |b      |[[1, 2, 3, 4], [4, 5, 6, 7], [7, 8, 9, 0]]|c      |[3, 6, 9]|
|x      |y      |[[1, 2, 3, 4], [4, 5, 6, 7], [7, 8, 9, 0]]|z      |[3, 6, 9]|
+-------+-------+------------------------------------------+-------+---------+
0 голосов
/ 09 июля 2020

spark>=2.4

  val df = spark.sql("select array(array(1,2,3,4),array(4,5,6,7),array(7,8,9,0)) as column3")
    df.show(false)
    df.printSchema()

    /**
      * +------------------------------------------+
      * |column3                                   |
      * +------------------------------------------+
      * |[[1, 2, 3, 4], [4, 5, 6, 7], [7, 8, 9, 0]]|
      * +------------------------------------------+
      *
      * root
      * |-- column3: array (nullable = false)
      * |    |-- element: array (containsNull = false)
      * |    |    |-- element: integer (containsNull = false)
      */

    df.withColumn("agg_values", expr("TRANSFORM(column3, x -> element_at(x, -2) )"))
      .show(false)

    /**
      * +------------------------------------------+----------+
      * |column3                                   |agg_values|
      * +------------------------------------------+----------+
      * |[[1, 2, 3, 4], [4, 5, 6, 7], [7, 8, 9, 0]]|[3, 6, 9] |
      * +------------------------------------------+----------+
      */
    // use array_join to get string

    df.withColumn("agg_values", expr("TRANSFORM(column3, x -> element_at(x, -2) )"))
      .withColumn("agg_values", array_join(col("agg_values"), ", "))
      .show(false)

    /**
      * +------------------------------------------+----------+
      * |column3                                   |agg_values|
      * +------------------------------------------+----------+
      * |[[1, 2, 3, 4], [4, 5, 6, 7], [7, 8, 9, 0]]|3, 6, 9   |
      * +------------------------------------------+----------+
      */
0 голосов
/ 09 июля 2020

Проверьте код ниже.

scala> df.show(false)
+-------+-------+------------------------------------------+-------+
|column1|column2|column3                                   |column4|
+-------+-------+------------------------------------------+-------+
|a      |b      |[[1, 2, 3, 4], [4, 5, 6, 7], [7, 8, 9, 0]]|c      |
|x      |y      |[[1, 2, 3, 4], [4, 5, 6, 7], [7, 8, 9, 0]]|z      |
+-------+-------+------------------------------------------+-------+

UDF

scala> val mkString = udf((data:Seq[Seq[Int]]) => data.map(_.init.last).mkString(","))

Результат

scala> df.withColumn("agg_values",mkString($"column3")).show(false)
+-------+-------+------------------------------------------+-------+----------+
|column1|column2|column3                                   |column4|agg_values|
+-------+-------+------------------------------------------+-------+----------+
|a      |b      |[[1, 2, 3, 4], [4, 5, 6, 7], [7, 8, 9, 0]]|c      |3,6,9     |
|x      |y      |[[1, 2, 3, 4], [4, 5, 6, 7], [7, 8, 9, 0]]|z      |3,6,9     |
+-------+-------+------------------------------------------+-------+----------+

Без UDF - Spark 2.4 +

scala> 

df
.withColumn("agg_values",expr("concat_ws(',',flatten(transform(column3, x -> slice(x,-2,1))))"))
.show(false)

+-------+-------+------------------------------------------+-------+----------+
|column1|column2|column3                                   |column4|agg_values|
+-------+-------+------------------------------------------+-------+----------+
|a      |b      |[[1, 2, 3, 4], [4, 5, 6, 7], [7, 8, 9, 0]]|c      |3,6,9     |
|x      |y      |[[1, 2, 3, 4], [4, 5, 6, 7], [7, 8, 9, 0]]|z      |3,6,9     |
+-------+-------+------------------------------------------+-------+----------+
scala> 

df
.withColumn("agg_values",expr("concat_ws(',',transform(column3, x -> element_at(x,-2)))"))
.show(false)

+-------+-------+------------------------------------------+-------+----------+
|column1|column2|column3                                   |column4|agg_values|
+-------+-------+------------------------------------------+-------+----------+
|a      |b      |[[1, 2, 3, 4], [4, 5, 6, 7], [7, 8, 9, 0]]|c      |3,6,9     |
|x      |y      |[[1, 2, 3, 4], [4, 5, 6, 7], [7, 8, 9, 0]]|z      |3,6,9     |
+-------+-------+------------------------------------------+-------+----------+
0 голосов
/ 09 июля 2020

pyspark, но может быть адаптирован к scala также:

import pyspark.sql.functions as F
from pyspark.sql.types import *
tst = sqlContext.createDataFrame([('a','b',[[1,2,3,4],[4,5,6,7],[7,8,9,0]]),('x','y',[[1,2,3,4],[4,5,6,7],[7,8,9,0]])],schema=['column1','column2','column3'])
tst1 = tst.withColumn("flattened",F.flatten('column3'))
#%% generate position  there may be better python ways to do this
pos=[]
ini=2
for x in range(3):# 3 is for this example
    pos+=[ini]
    ini=ini+4
#%%
expr=[F.col('flattened')[x] for x in pos]
tst2 = tst1.withColumn("result",F.array(*expr))

Результаты:

+-------+-------+--------------------+--------------------+---------+
|column1|column2|             column3|           flattened|   result|
+-------+-------+--------------------+--------------------+---------+
|      a|      b|[[1, 2, 3, 4], [4...|[1, 2, 3, 4, 4, 5...|[3, 6, 9]|
|      x|      y|[[1, 2, 3, 4], [4...|[1, 2, 3, 4, 4, 5...|[3, 6, 9]|
+-------+-------+--------------------+--------------------+---------+
...