Подмножество одного столбца массива с другим (логическим) столбцом массива - PullRequest
3 голосов
/ 22 апреля 2019

У меня есть такой Dataframe (в Pyspark 2.3.1):

from pyspark.sql import Row

my_data = spark.createDataFrame([
  Row(a=[9, 3, 4], b=['a', 'b', 'c'], mask=[True, False, False]),
  Row(a=[7, 2, 6, 4], b=['w', 'x', 'y', 'z'], mask=[True, False, True, False])
])
my_data.show(truncate=False)
#+------------+------------+--------------------------+
#|a           |b           |mask                      |
#+------------+------------+--------------------------+
#|[9, 3, 4]   |[a, b, c]   |[true, false, false]      |
#|[7, 2, 6, 4]|[w, x, y, z]|[true, false, true, false]|
#+------------+------------+--------------------------+

Теперь я хотел бы использовать столбец mask для подмножества a и bколонки:

my_desired_output = spark.createDataFrame([
  Row(a=[9], b=['a']),
  Row(a=[7, 6], b=['w', 'y'])
])
my_desired_output.show(truncate=False)
#+------+------+
#|a     |b     |
#+------+------+
#|[9]   |[a]   |
#|[7, 6]|[w, y]|
#+------+------+

Какой «идиоматический» способ добиться этого?Текущее решение, которое у меня есть, включает map -покрытие базового RDD и подмножество с Numpy, которое кажется неэлегантным:

import numpy as np

def subset_with_mask(row):
    mask = np.asarray(row.mask)
    a_masked = np.asarray(row.a)[mask].tolist()
    b_masked = np.asarray(row.b)[mask].tolist()
    return Row(a=a_masked, b=b_masked)

my_desired_output = spark.createDataFrame(my_data.rdd.map(subset_with_mask))

Это лучший способ или есть что-то лучшее (менее многословное и/ или более эффективный) я могу использовать инструменты Spark SQL?

Ответы [ 3 ]

2 голосов
/ 22 апреля 2019

Одним из вариантов является использование UDF, который вы можете дополнительно специализировать по типу данных в массиве:

import numpy as np
import pyspark.sql.functions as F
import pyspark.sql.types as T

def _mask_list(lst, mask):
    return np.asarray(lst)[mask].tolist()

mask_array_int = F.udf(_mask_list, T.ArrayType(T.IntegerType()))
mask_array_str = F.udf(_mask_list, T.ArrayType(T.StringType()))

my_desired_output = my_data
my_desired_output = my_desired_output.withColumn(
    'a', mask_array_int(F.col('a'), F.col('mask'))
)
my_desired_output = my_desired_output.withColumn(
    'b', mask_array_str(F.col('b'), F.col('mask'))
)
1 голос
/ 22 апреля 2019

UDF, упомянутые в предыдущем ответе, вероятно, путь до функций массива, добавленных в Spark 2.4.Для полноты приведем реализацию «чистого SQL» до версии 2.4.

from pyspark.sql.functions import *

df = my_data.withColumn("row", monotonically_increasing_id())

df1 = df.select("row", posexplode("a").alias("pos", "a"))
df2 = df.select("row", posexplode("b").alias("pos", "b"))
df3 = df.select("row", posexplode("mask").alias("pos", "mask"))

df1\
    .join(df2, ["row", "pos"])\
    .join(df3, ["row", "pos"])\
    .filter("mask")\
    .groupBy("row")\
    .agg(collect_list("a").alias("a"), collect_list("b").alias("b"))\
    .select("a", "b")\
    .show()

Вывод:

+------+------+
|     a|     b|
+------+------+
|[7, 6]|[w, y]|
|   [9]|   [a]|
+------+------+
0 голосов
/ 23 апреля 2019

Это еще один подход с 2 UDF для архивирования и разархивирования списков:

from pyspark.sql.types import ArrayType, StructType, StructField, StringType
from pyspark.sql.functions import udf, col, lit

zip_schema = ArrayType(StructType((StructField("a", StringType()), StructField("b", StringType()))))  
unzip_schema = ArrayType(StringType())

zip_udf = udf(my_zip, zip_schema)
unzip_udf = udf(my_unzip, unzip_schema)

df = my_data.withColumn("zipped", zip_udf(col("a"), col("b"), col("mask")))
       .withColumn("a", unzip_udf(col("zipped"), lit(0)))
       .withColumn("b", unzip_udf(col("zipped"), lit(1)))
       .drop("zipped", "mask")

def my_unzip(zipped, indx):
    return  [str(x[indx]) for x in zipped]

def my_zip(a, b, mask):
    return [(x[0], x[1]) for x in zip(a,b,mask) if x[2]]

my_zip отвечает за фильтрацию данных на основе маски и создание кортежа (cola,colb), который также является элементом возвращаемого списка.

my_unzip извлечет данные для определенного индекса из данных, созданных с помощью my_zip .

Вывод:

+------+------+
|     a|     b|
+------+------+
|   [9]|   [a]|
|[7, 6]|[w, y]|
+------+------+
...