Pyspark, фильтрующий элементы в столбце списков - PullRequest
2 голосов
/ 27 марта 2020

Я пытаюсь отфильтровать данные в фрейме данных. Кадр данных df имеет 2 столбца - query + href. В одной строке: query - случайная строка, а href - список строк. У меня есть еще один список под названием urls со строками.

Поиск URL-адреса из списка urls внутри списка столбцов href + позиция URL в списке href. Я пытался df.filter(col("href")).isin(urls), но pyspark жалуется на список. + Я не могу сделать .collect () bcs объема данных.

Заранее спасибо!

В основном это должно выглядеть так, но я не совсем уверен, как это сделать в pyspark:

for url in urls:
    if url in "href item list":
        print(query + url + "href item list".index(url)) # doesn't matter if index or position
    else:
        pass

Пример:

urls = [url1, url2, url3, url4, url5, url6, url7, url8]

query | href
------------
q1    | [url7, url11, url12, url13, url14]
q2    | [url1, url3, url5, url6]
q3    | [url1, url2, url8]

Output should look like 

q2 - url1 - 0
q3 - url1 - 0
q3 - url2 - 1
q2 - url3 - 1
q2 - url5 - 2
q2 - url6 - 3
q1 - url7 - 0
q3 - url8 - 2

Ответы [ 2 ]

0 голосов
/ 27 марта 2020

Я предлагаю 1) сделать из одного столбца DataFrame вашего urls, используя explode и 2) использовать posexplode, чтобы сделать из 3 столбцов DataFrame вашего запроса, href и позицию индекса href, затем 3) внутреннее объединение двух

  1. Создание фрейма данных urls
from pyspark.sql.functions import explode, posexplode

urls = [
    (['url1', 'url2', 'url3', 'url4', 'url5', 'url6', 'url7', 'url8'],),
]
refs = (
    spark.createDataFrame(urls, ['ref']).
        select(
            explode('ref')
        )
)
refs.show(truncate=False)
# +----+
# |col |
# +----+
# |url1|
# |url2|
# |url3|
# |url4|
# |url5|
# |url6|
# |url7|
# |url8|
# +----+
Создание предоставленных вами примеров данных
data = [
    ("q1", ["url7", "url11", "url12", "url13", "url14"]),
    ("q2", ["url1", "url3", "url5", "url6"]),
    ("q3", ["url1", "url2", "url8"]),
]
df = spark.createDataFrame(data, ["query", "href"])
df.show(truncate=False)
# +-----+----------------------------------+
# |query|href                              |
# +-----+----------------------------------+
# |q1   |[url7, url11, url12, url13, url14]|
# |q2   |[url1, url3, url5, url6]          |
# |q3   |[url1, url2, url8]                |
# +-----+----------------------------------+
Решение
(
    df.
        select(
            'query',
            posexplode('href')
        ).
        join(
            refs,
            'col',
            'inner'
        ).
        orderBy('col', 'query').
        show(truncate=False)
)
# +----+-----+---+                                                                
# |col |query|pos|
# +----+-----+---+
# |url1|q2   |0  |
# |url1|q3   |0  |
# |url2|q3   |1  |
# |url3|q2   |1  |
# |url5|q2   |2  |
# |url6|q2   |3  |
# |url7|q1   |0  |
# |url8|q3   |2  |
# +----+-----+---+
0 голосов
/ 27 марта 2020

Шаги в словах:

  1. explode столбец href
  2. filter те строки с известным URL
  3. collect the найдите и просмотрите каждый URL в urls

Приведенный ниже код разбит на небольшие шаги, чтобы упростить проверку промежуточных фреймов данных.

Предполагая, что у вас уже есть SparkSession объект с именем ss, мы можем воссоздать ваш оригинальный DataFrame следующим образом:

df = ss.createDataFrame(
    [
        ("q1", ["url7", "url11", "url12", "url13", "url14"]),
        ("q2", ["url1", "url3", "url5", "url6"]),
        ("q3", ["url1", "url2", "url8"]),
    ],
    ["query", "href"],
)
urls = ["url1", "url2", "url3", "url4", "url5", "url6", "url7", "url8"]

Теперь мы применим шаги, описанные ранее:

import pyspark.sql.functions as sf

# Exploding the column "href".
exp_df = df.select("query", sf.explode(sf.col("href")).alias("href_sing"))
# Checking if the URL in the DataFrame exists in "urls".
# I suggest to convert "urls" into a "set" before this step: "set(urls)". It might 
# improve the performance of "isin", but this is just an optional optimization.
known_df = exp_df.select("*", sf.col("href_sing").isin(urls).alias("is_known"))
# Discard unknown URLs.
true_df = true_df = known_df.filter("is_known = True")
# The final results.
res = [
    (r["query"], r["href_sing"], urls.index(r["href_sing"]))
    for r in true_df.collect()
]

Проверка некоторых значений:

In [18]: df.show()      
+-----+--------------------+
|query|                href|
+-----+--------------------+
|   q1|[url7, url11, url...|
|   q2|[url1, url3, url5...|
|   q3|  [url1, url2, url8]|
+-----+--------------------+

In [19]: exp_df.show()                                                                    
+-----+---------+
|query|href_sing|
+-----+---------+
|   q1|     url7|
|   q1|    url11|
|   q1|    url12|
|   q1|    url13|
|   q1|    url14|
|   q2|     url1|
|   q2|     url3|
|   q2|     url5|
|   q2|     url6|
|   q3|     url1|
|   q3|     url2|
|   q3|     url8|
+-----+---------+

In [20]: true_df.show()                                                                   
+-----+---------+--------+
|query|href_sing|is_known|
+-----+---------+--------+
|   q1|     url7|    true|
|   q2|     url1|    true|
|   q2|     url3|    true|
|   q2|     url5|    true|
|   q2|     url6|    true|
|   q3|     url1|    true|
|   q3|     url2|    true|
|   q3|     url8|    true|
+-----+---------+--------+

In [23]: res                                                                              
Out[23]: 
[('q1', 'url7', 6),
 ('q2', 'url1', 0),
 ('q2', 'url3', 2),
 ('q2', 'url5', 4),
 ('q2', 'url6', 5),
 ('q3', 'url1', 0),
 ('q3', 'url2', 1),
 ('q3', 'url8', 7)]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...