PySpark: выберите столбец на основе условия, в котором значения других столбцов соответствуют некоторым определенным c значениям, затем создайте результат сопоставления как новый столбец - PullRequest
1 голос
/ 29 мая 2020

Я задавал сходство вопросы раньше, но по каким-то причинам печально, что мне приходится переопределить его в PySpark.
Например,

app      col1

app1     anybody love me?
app2     I hate u
app3     this hat is good
app4     I don't like this one
app5     oh my god
app6     damn you.
app7     such nice girl
app8     xxxxx
app9     pretty prefect
app10    don't love me.
app11    xxx anybody?

Я хочу сопоставить список ключевых слов, например ['anybody', 'love', 'you', 'xxx', 'don't'], и выбрать результат сопоставленного ключевого слова в качестве нового столбца с именем ключевого слова следующим образом:

app      keyword

app1     [anybody, love]
app4     [don't]
app6     [you]
app8     [xxx]
app10    [don't, love]
app11    [xxx]

В качестве принятого ответа подходящий способ, которым я могу сделать, - это создать временный dataframe, который преобразуется списком строк, затем inner join эти два кадра данных вместе.
и select строки app и keyword, которые совпадают в условии.

-- Hiveql implementation
select t.app, k.keyword
from  mytable t
inner join (values ('anybody'), ('you'), ('xxx'), ('don''t')) as k(keyword)
    on t.col1 like conca('%', k.keyword, '%')


Но я не знаком с PySpark, и мне неудобно реализовывать его заново.
Может ли кто-нибудь мне помочь?
Заранее спасибо.

1 Ответ

1 голос
/ 30 мая 2020

Ниже представлены два возможных подхода:

Вариант 1

Первый вариант - использовать API фрейма данных для реализации аналогичного соединения, как в предыдущем вопросе. Здесь мы конвертируем список keywords в фрейм данных, а затем присоединяем его к большому фрейму данных ( обратите внимание, что мы транслируем маленький фрейм данных, чтобы обеспечить лучшую производительность ):

from pyspark.sql.functions import broadcast

df = spark.createDataFrame([
  ["app1", "anybody love me?"],
  ["app4", "I don't like this one"],
  ["app5", "oh my god"],
  ["app6", "damn you."],
  ["app7", "such nice girl"],
  ["app8", "xxxxx"],
  ["app10", "don't love me."]
]).toDF("app", "col1")

# create keywords dataframe
kdf = spark.createDataFrame([(k,) for k in keywords], "key string")

# +-----+
# |  key|
# +-----+
# |  xxx|
# |don't|
# +-----+

df.join(broadcast(kdf), df["col1"].contains(kdf["key"]), "inner")

# +-----+---------------------+-----+
# |app  |col1                 |key  |
# +-----+---------------------+-----+
# |app4 |I don't like this one|don't|
# |app8 |xxxxx                |xxx  |
# |app10|don't love me.       |don't|
# +-----+---------------------+-----+

Объединение Условие основано на содержит функцию класса Column.

Вариант 2

Вы также можете использовать функцию высокого порядка PySpark фильтр в сочетании с rlike в выражении:

from pyspark.sql.functions import lit, expr, array

df = spark.createDataFrame([
  ["app1", "anybody love me?"],
  ["app4", "I don't like this one"],
  ["app5", "oh my god"],
  ["app6", "damn you."],
  ["app7", "such nice girl"],
  ["app8", "xxxxx"],
  ["app10", "don't love me."]
]).toDF("app", "col1")

keywords = ["xxx", "don't"]

df.withColumn("keywords", array([lit(k) for k in keywords])) \
  .withColumn("keywords", expr("filter(keywords, k -> col1 rlike k)")) \
  .where("size(keywords) > 0") \
  .show(10, False)

# +-----+---------------------+--------+
# |app  |col1                 |keywords|
# +-----+---------------------+--------+
# |app4 |I don't like this one|[don't] |
# |app8 |xxxxx                |[xxx]   |
# |app10|don't love me.       |[don't] |
# +-----+---------------------+--------+

Пояснение

  1. с array([lit(k) for k in keywords]) мы сгенерируйте массив, содержащий ключевые слова, на которых будет основан наш поиск, а затем мы добавим его к существующему фрейму данных, используя withColumn.

  2. затем с expr("size(filter(keywords, k -> col1 rlike k)) > 0") мы go через элементы ключевых слов, пытающиеся выяснить, присутствует ли какое-либо из них в тексте col1. Если это так, filter вернет один или несколько элементов, а size будет больше 0, что составляет наше where условие для получения записей.

...