Ниже представлены два возможных подхода:
Вариант 1
Первый вариант - использовать API фрейма данных для реализации аналогичного соединения, как в предыдущем вопросе. Здесь мы конвертируем список keywords
в фрейм данных, а затем присоединяем его к большому фрейму данных ( обратите внимание, что мы транслируем маленький фрейм данных, чтобы обеспечить лучшую производительность ):
from pyspark.sql.functions import broadcast
df = spark.createDataFrame([
["app1", "anybody love me?"],
["app4", "I don't like this one"],
["app5", "oh my god"],
["app6", "damn you."],
["app7", "such nice girl"],
["app8", "xxxxx"],
["app10", "don't love me."]
]).toDF("app", "col1")
# create keywords dataframe
kdf = spark.createDataFrame([(k,) for k in keywords], "key string")
# +-----+
# | key|
# +-----+
# | xxx|
# |don't|
# +-----+
df.join(broadcast(kdf), df["col1"].contains(kdf["key"]), "inner")
# +-----+---------------------+-----+
# |app |col1 |key |
# +-----+---------------------+-----+
# |app4 |I don't like this one|don't|
# |app8 |xxxxx |xxx |
# |app10|don't love me. |don't|
# +-----+---------------------+-----+
Объединение Условие основано на содержит функцию класса Column.
Вариант 2
Вы также можете использовать функцию высокого порядка PySpark фильтр в сочетании с rlike в выражении:
from pyspark.sql.functions import lit, expr, array
df = spark.createDataFrame([
["app1", "anybody love me?"],
["app4", "I don't like this one"],
["app5", "oh my god"],
["app6", "damn you."],
["app7", "such nice girl"],
["app8", "xxxxx"],
["app10", "don't love me."]
]).toDF("app", "col1")
keywords = ["xxx", "don't"]
df.withColumn("keywords", array([lit(k) for k in keywords])) \
.withColumn("keywords", expr("filter(keywords, k -> col1 rlike k)")) \
.where("size(keywords) > 0") \
.show(10, False)
# +-----+---------------------+--------+
# |app |col1 |keywords|
# +-----+---------------------+--------+
# |app4 |I don't like this one|[don't] |
# |app8 |xxxxx |[xxx] |
# |app10|don't love me. |[don't] |
# +-----+---------------------+--------+
Пояснение
с array([lit(k) for k in keywords])
мы сгенерируйте массив, содержащий ключевые слова, на которых будет основан наш поиск, а затем мы добавим его к существующему фрейму данных, используя withColumn
.
затем с expr("size(filter(keywords, k -> col1 rlike k)) > 0")
мы go через элементы ключевых слов, пытающиеся выяснить, присутствует ли какое-либо из них в тексте col1. Если это так, filter
вернет один или несколько элементов, а size
будет больше 0, что составляет наше where
условие для получения записей.