Обновленный ответ :
Это можно сделать без UDF, если предположить, что массивы имеют одинаковую длину. Давайте попробуем следующее.
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.functions import col
spark = SparkSession.builder.appName('prefix_finder').getOrCreate()
# sample data creation
my_df = spark.createDataFrame(
[('scooby', ['cartoon', 'kidfriendly']),
('batman', ['dark', 'cars']),
('meshuggah', ['heavy', 'dark']),
('guthrie', ['god', 'guitar'])
]
, schema=('character', 'tags'))
Фрейм данных my_df
будет выглядеть следующим образом:
+---------+----------------------+
|character|tags |
+---------+----------------------+
|scooby |[cartoon, kidfriendly]|
|batman |[dark, cars] |
|meshuggah|[heavy, dark] |
|guthrie |[god, guitar] |
+---------+----------------------+
Если мы ищем префикс car , только 1-й и 2-й ряд должны быть возвращены, потому что car является префиксом cartoon и cars .
. достигните этого.
num_items_in_arr = 2 # this was the assumption
prefix = 'car'
my_df2 = my_df.select(col('character'), col('tags'), *(col('tags').getItem(i).alias(f'tag{i}') for i in range(num_items_in_arr)))
Фрейм данных my_df2
будет выглядеть следующим образом:
+---------+----------------------+-------+-----------+
|character|tags |tag0 |tag1 |
+---------+----------------------+-------+-----------+
|scooby |[cartoon, kidfriendly]|cartoon|kidfriendly|
|batman |[dark, cars] |dark |cars |
|meshuggah|[insane, heavy] |insane |heavy |
|guthrie |[god, guitar] |god |guitar |
+---------+----------------------+-------+-----------+
Давайте создадим столбец concat_tags на my_df2
, который мы будет использоваться для сопоставления регулярных выражений.
cols_of_interest = [f'tag{i}' for i in range(num_items_in_arr)]
for idx, col_name in enumerate(cols_of_interest):
my_df2 = my_df2.withColumn(col_name, f.substring(col_name, 1, prefix_len))
if idx == 0:
my_df2 = my_df2.withColumn(col_name, f.concat(lit("("), col_name, lit(".*")))
elif idx == len(cols_to_update_concat) - 1:
my_df2 = my_df2.withColumn(col_name, f.concat(col_name, lit(".*)")))
else:
my_df2 = my_df2.withColumn(col_name, f.concat(col_name, lit(".*")))
my_df3 = my_df2.withColumn('concat_tags', f.concat_ws('|', *cols_of_interest)).drop(*cols_of_interest)
my_df3
выглядит следующим образом:
+---------+----------------------+-------------+
|character|tags |concat_tags |
+---------+----------------------+-------------+
|scooby |[cartoon, kidfriendly]|(car.*|kid.*)|
|batman |[dark, cars] |(dar.*|car.*)|
|meshuggah|[insane, heavy] |(ins.*|hea.*)|
|guthrie |[god, guitar] |(god.*|gui.*)|
+---------+----------------------+-------------+
Теперь нам нужно применить сопоставление регулярных выражений для столбца concat_tags .
my_df4 = my_df3.withColumn('matched', f.expr(r"regexp_extract(prefix, concat_tags, 0)"))
Результат выглядит следующим образом:
+---------+----------------------+-------------+-------+
|character|tags |concat_tags |matched|
+---------+----------------------+-------------+-------+
|scooby |[cartoon, kidfriendly]|(car.*|kid.*)|car |
|batman |[dark, cars] |(dar.*|car.*)|car |
|meshuggah|[insane, heavy] |(ins.*|hea.*)| |
|guthrie |[god, guitar] |(god.*|gui.*)| |
+---------+----------------------+-------------+-------+
Немного очистки.
my_df5 = my_df4.filter(my_df4.matched != "").drop('concat_tags', 'matched')
И вот мы с окончательным кадром данных:
+---------+----------------------+
|character|tags |
+---------+----------------------+
|scooby |[cartoon, kidfriendly]|
|batman |[dark, cars] |
+---------+----------------------+