Pandas определяемая пользователем функция (UDF) - возможно ли вернуть логическое значение? - PullRequest
0 голосов
/ 26 февраля 2020

Я пытаюсь написать функцию как Pandas UDF, которая проверяла бы, начинается ли какой-либо элемент массива строк с определенного значения. Результат, который я ищу, будет выглядеть примерно так:

df.filter(list_contains(val, df.stringArray_column)).show()

Функция list_contains будет возвращать True в каждой строке, где любой элемент df.stringArray начинается с val.

Просто пример:

df = spark.read.csv(path)
display(df.filter(list_contains('50', df.stringArray_column)))

Этот код выше будет отображать каждую строку df, где элемент столбца stringArray начинается с 50.

Я написал функцию в python, что очень медленно

    def list_contains(val):
    # Perfom what ListContains generated
  def list_contains_udf(column_list):
    for element in column_list:
      if element.startswith(val):
        return True
    return False
  return udf(list_contains_udf, BooleanType())

Спасибо за вашу помощь.

РЕДАКТИРОВАТЬ: Вот некоторые примеры данных, а также пример вывода, который я ищу:

df.LIST: ["408000","641100"]
         ["633400","641100"]
         ["633400","791100"]
         ["633400","408100"]
         ["633400","641100"]
         ["408110","641230"]
         ["633400","647200"]

display(df.select('LIST').filter(list_contains('408')(df.LIST)))

output: LIST
        ["408000","641100"]
        ["633400","408100"]
        ["408110","641230"]

1 Ответ

0 голосов
/ 26 февраля 2020

Обновленный ответ :

Это можно сделать без UDF, если предположить, что массивы имеют одинаковую длину. Давайте попробуем следующее.

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.functions import col

spark = SparkSession.builder.appName('prefix_finder').getOrCreate()

# sample data creation
my_df = spark.createDataFrame(
    [('scooby', ['cartoon', 'kidfriendly']),
     ('batman', ['dark', 'cars']),
     ('meshuggah', ['heavy', 'dark']),
     ('guthrie', ['god', 'guitar'])
     ]
    , schema=('character', 'tags'))

Фрейм данных my_df будет выглядеть следующим образом:

+---------+----------------------+
|character|tags                  |
+---------+----------------------+
|scooby   |[cartoon, kidfriendly]|
|batman   |[dark, cars]          |
|meshuggah|[heavy, dark]         |
|guthrie  |[god, guitar]         |
+---------+----------------------+

Если мы ищем префикс car , только 1-й и 2-й ряд должны быть возвращены, потому что car является префиксом cartoon и cars .

. достигните этого.

num_items_in_arr = 2 # this was the assumption
prefix = 'car'

my_df2 = my_df.select(col('character'), col('tags'), *(col('tags').getItem(i).alias(f'tag{i}') for i in range(num_items_in_arr)))

Фрейм данных my_df2 будет выглядеть следующим образом:

+---------+----------------------+-------+-----------+
|character|tags                  |tag0   |tag1       |
+---------+----------------------+-------+-----------+
|scooby   |[cartoon, kidfriendly]|cartoon|kidfriendly|
|batman   |[dark, cars]          |dark   |cars       |
|meshuggah|[insane, heavy]       |insane |heavy      |
|guthrie  |[god, guitar]         |god    |guitar     |
+---------+----------------------+-------+-----------+

Давайте создадим столбец concat_tags на my_df2, который мы будет использоваться для сопоставления регулярных выражений.

cols_of_interest = [f'tag{i}' for i in range(num_items_in_arr)]

for idx, col_name in enumerate(cols_of_interest):
    my_df2 = my_df2.withColumn(col_name, f.substring(col_name, 1, prefix_len))

    if idx == 0:
        my_df2 = my_df2.withColumn(col_name, f.concat(lit("("), col_name, lit(".*")))
    elif idx == len(cols_to_update_concat) - 1:
        my_df2 = my_df2.withColumn(col_name, f.concat(col_name, lit(".*)")))
    else:
        my_df2 = my_df2.withColumn(col_name, f.concat(col_name, lit(".*")))

my_df3 = my_df2.withColumn('concat_tags', f.concat_ws('|', *cols_of_interest)).drop(*cols_of_interest)

my_df3 выглядит следующим образом:

+---------+----------------------+-------------+
|character|tags                  |concat_tags  |
+---------+----------------------+-------------+
|scooby   |[cartoon, kidfriendly]|(car.*|kid.*)|
|batman   |[dark, cars]          |(dar.*|car.*)|
|meshuggah|[insane, heavy]       |(ins.*|hea.*)|
|guthrie  |[god, guitar]         |(god.*|gui.*)|
+---------+----------------------+-------------+

Теперь нам нужно применить сопоставление регулярных выражений для столбца concat_tags .

my_df4 = my_df3.withColumn('matched', f.expr(r"regexp_extract(prefix, concat_tags, 0)"))

Результат выглядит следующим образом:

+---------+----------------------+-------------+-------+
|character|tags                  |concat_tags  |matched|
+---------+----------------------+-------------+-------+
|scooby   |[cartoon, kidfriendly]|(car.*|kid.*)|car    |
|batman   |[dark, cars]          |(dar.*|car.*)|car    |
|meshuggah|[insane, heavy]       |(ins.*|hea.*)|       |
|guthrie  |[god, guitar]         |(god.*|gui.*)|       |
+---------+----------------------+-------------+-------+

Немного очистки.

my_df5 = my_df4.filter(my_df4.matched != "").drop('concat_tags', 'matched')

И вот мы с окончательным кадром данных:

+---------+----------------------+
|character|tags                  |
+---------+----------------------+
|scooby   |[cartoon, kidfriendly]|
|batman   |[dark, cars]          |
+---------+----------------------+
...