Извлечение параметров URL с использованием Python и PySpark - PullRequest
1 голос
/ 07 января 2020

Скажем, у меня есть столбец, заполненный URL-адресами, как показано ниже:

+------------------------------------------+
|url                                       |
+------------------------------------------+
|https://www.example1.com?param1=1&param2=a|
|https://www.example2.com?param1=2&param2=b|
|https://www.example3.com?param1=3&param2=c|
+------------------------------------------+

Каков наилучший способ извлечения параметров URL-адресов из этого столбца и добавления их в виде столбцов к фрейму данных для создания ниже?

+-------------------------------------------+---------------+
|                                        url| param1| param2|
+-------------------------------------------+---------------+
|https://www.example1.com?param1=1&param2=a |      1|      a|
|https://www.example2.com?param1=2&param2=b |      2|      b|
|https://www.example3.com?param1=3&param2=c |      3|      c|
|etc...                                     | etc...| etc...|
+-------------------------------------------+---------------+

Мои попытки

Я могу придумать два возможных способа сделать это, используя functions.regexp_extract из библиотеки pyspark или urllib.parse.parse_qs и urllib.parse.urlparse из стандартная библиотека. В первом решении используется регулярное выражение, которое является хитрым методом извлечения параметров из строк, но последнее должно быть заключено в используемую UDF.

from pyspark.sql import *
from pyspark.sql import functions as fn

df = spark.createDataFrame(
  [
    ("https://www.example.com?param1=1&param2=a",),
    ("https://www.example2.com?param1=2&param2=b",),
    ("https://www.example3.com?param1=3&param2=c",)
  ],
  ["url"]
)

Решение Regex:

df2 = df.withColumn("param1", fn.regexp_extract('url', 'param1=(\d)', 1))
df2 = df2.withColumn("param2", fn.regexp_extract('url', 'param2=([a-z])', 1))
df2.show()

>> +------------------------------------------+------+------+
>> |url                                       |param1|param2|
>> +------------------------------------------+------+------+
>> |https://www.example1.com?param1=1&param2=a|1     |a     |
>> |https://www.example2.com?param1=2&param2=b|2     |b     |
>> |https://www.example3.com?param1=3&param2=c|3     |c     |
>> +------------------------------------------+------+------+

Решение UDF:

from urllib.parse import urlparse, parse_qs
from pyspark.sql.types import MapType, StringType
extract_params = udf(lambda x: {k: v[0] for k, v in parse_qs(urlparse(x).query).items()}, MapType(StringType(), StringType()))

df3 = df.withColumn(
  "params", extract_params(df.url)
)

df3.withColumn(
  "param1", df3.params['param1']
).withColumn(
  "param2", df3.params['param2']
).drop("params").show()

>> +------------------------------------------+------+------+
>> |url                                       |param1|param2|
>> +------------------------------------------+------+------+
>> |https://www.example1.com?param1=1&param2=a|1     |a     |
>> |https://www.example2.com?param1=2&param2=b|2     |b     |
>> |https://www.example3.com?param1=3&param2=c|3     |c     |
>> +------------------------------------------+------+------+

Я хотел бы использовать универсальность библиотеки, подобной urllib, но также хотел бы оптимизировать ее написание в функциях pyspark. Есть ли лучший способ, чем два, которые я пробовал до сих пор?

Ответы [ 4 ]

1 голос
/ 07 января 2020

Вот простое решение с использованием только встроенных функций Spark. С функцией parse_url, которая может использоваться только в запросе SQL, но вы можете использовать ее с expr.

Сначала проанализируйте URL-адрес, затем разделите и разберите, чтобы получить параметры и их значения и, наконец, поворот, чтобы получить каждый параметр в виде столбца.

df.withColumn("parsed_url", explode(split(expr("parse_url(url, 'QUERY')"), "&"))) \
    .withColumn("parsed_url", split("parsed_url", "=")) \
    .select("url",
            col("parsed_url").getItem(0).alias("param_name"),
            col("parsed_url").getItem(1).alias("value")
            ) \
    .groupBy("url") \
    .pivot("param_name") \
    .agg(first("value")) \
    .show()

Дает:

+------------------------------------------+------+------+
|url                                       |param1|param2|
+------------------------------------------+------+------+
|https://www.example2.com?param1=2&param2=b|2     |b     |
|https://www.example.com?param1=1&param2=a |1     |a     |
|https://www.example3.com?param1=3&param2=c|3     |c     |
+------------------------------------------+------+------+
0 голосов
/ 08 января 2020

Вот еще одно решение, которое работает для Spark> = 2.4, поскольку оно использует функцию высокого порядка filter .

Следующее решение основано на предположении, что все записи имеют одинаковое число параметры запроса:

from pyspark.sql.functions import expr, col

# get the query string for the first non null url
query = df.filter(df["url"].isNotNull()).first()["url"].split("?")[1]

# extract parameters (this should remain the same for all the records)
params = list(map(lambda p: p.split("=")[0], query.split("&")))

# you can also omit the two previous lines (query parameters autodiscovery)
# and replace them with: params = ['param1', 'param2']
# when you know beforehand the query parameters

cols = [col('url')] + [expr(f"split( \
                                    filter( \
                                          split(split(url,'\\\?')[1], '&'), \
                                          p -> p like '{qp}=%' \
                                    )[0], \
                            '=')[1]").alias(qp) 
                       for qp in params]

df.select(*cols).show(10, False)

# +------------------------------------------+------+------+
# |url                                       |param1|param2|
# +------------------------------------------+------+------+
# |https://www.example.com?param1=1&param2=a |1     |a     |
# |https://www.example2.com?param1=2&param2=b|2     |b     |
# |https://www.example3.com?param1=3&param2=c|3     |c     |
# +------------------------------------------+------+------+

Объяснение:

  1. split(split(url,'\\\?')[1], '&') -> [param1=1,param2=a]: сначала разделить на? чтобы получить строку запроса, затем с помощью &. В результате мы получаем массив [param1=1,param2=a]

  2. filter(... , p -> p like '{qp}=%')[0] -> param1=1, param2=a ...: применить фильтр к элементам массива, которые мы получили из предыдущего шаг и примените фильтр p -> p like '{qp}=%', где {qp}=% имя параметра, т.е. param1=%. qp обозначает элементы массива params. Фильтр вернет массив, поэтому мы просто обращаемся к первому элементу, поскольку знаем, что конкретный param должен существовать всегда. Для первого параметра это вернет param1=1, для второго param2=a et c.

  3. split( ... , '=')[1] -> 1, a, ...: в конечном итоге разделится на = для получить значение параметра запроса. Здесь мы возвращаем второе значение, так как первое будет именем параметра запроса.

Основная идея здесь заключается в том, что мы разделяем задачу на две подзадачи, сначала получаем все возможные параметры запроса, а затем мы извлекаем значения для всех URL-адресов.

Почему это? Что ж, вы действительно можете использовать pivot, поскольку @blackbishop уже блестяще реализован, хотя я считаю, что это не сработает , если количество параметров запроса очень велико , то есть 500 или более уникальных параметров. Это потребует большой случайности, которая может вызвать исключение OOM. С другой стороны, если вы уже знаете, что количество данных низкое, то решение @ blackbishop следует считать идеальным для всех случаев.

Для решения предыдущей проблемы лучше сначала найти все параметры запроса (здесь я только что предположил, что все запросы имеют идентичные параметры, но реализация для этой части должна быть аналогична предыдущей), а затем применить приведенное выше выражение для каждого param, чтобы извлечь значения параметров. Это сгенерирует выражение select, которое будет содержать несколько выражений expr, хотя это не должно вызывать проблем с производительностью, поскольку select является узким преобразованием и не вызывает случайного перемешивания.

0 голосов
/ 07 января 2020

Вы можете добавить функцию разделения следующим образом.

from pyspark.sql import functions as f
df3 = df3.withColumn("param1", f.split(f.split(df3.url, "param1=")[1], "&")[0])
0 голосов
/ 07 января 2020

Я go с UDF и более общим c форматом вывода, используя тип карты.

from urllib.parse import urlparse, parse_qs

from pyspark.sql import functions as F, Types as T

@F.udf(T.MapType(T.StringType(), T.ArrayType(T.StringType())))
def url_param_pars(url):
    parsed = urlparse(url) 
    return parse_qs(parsed.query)

df_params = df.withColumn("params", url_param_pars(F.col('url')))

df_params.show(truncate=False)
+------------------------------------------+------------------------------+
|url                                       |params                        |
+------------------------------------------+------------------------------+
|https://www.example.com?param1=1&param2=a |[param1 -> [1], param2 -> [a]]|
|https://www.example2.com?param1=2&param2=b|[param1 -> [2], param2 -> [b]]|
|https://www.example3.com?param1=3&param2=c|[param1 -> [3], param2 -> [c]]|
+------------------------------------------+------------------------------+

df_params.printSchema()                                                                                                         
root
 |-- url: string (nullable = true)
 |-- params: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: string (containsNull = true)

С помощью этого метода вы можете иметь любое количество параметров.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...