Преобразовать строку запроса URI в значение ключа Array of Struct в PySpark - PullRequest
0 голосов
/ 06 марта 2020

У меня есть DataFrame в PySpark со столбцом строки запроса URI (StringType), например:

+--------------+ 
| cs_uri_query |
+--------------+
| a=1&b=2&c=3  |
+--------------+
| d&e=&f=4     |
+--------------+

Мне нужно преобразовать этот столбец в ArrayType элементов StructField со следующей структурой:

ArrayType(StructType([StructField('key', StringType(), nullable=False),
                      StructField('value', StringType(), nullable=True)]))

Мой ожидаемый столбец такой:

+------------------------------------------------------------+ 
| cs_uri_query                                               |
+------------------------------------------------------------+
| [{key=a, value=1},{key=b, value=2},{key=c, value=3}]       |
+------------------------------------------------------------+
| [{key=d, value=null},{key=e, value=null},{key=f, value=4}] |
+------------------------------------------------------------+

UDF - единственный способ, который я нашел для достижения этой цели. Я использую чистые функции Spark и, если это возможно, я бы хотел избежать UDF ... UDF имеют очень плохую производительность в PySpark, в отличие от использования Spark на Scala lang.

Это мой код используя UDF:

def parse_query(query):
    args = None
    if query:
        args = []
        for arg in query.split("&"):
            if arg:
                if "=" in arg:
                    a = arg.split("=")
                    if a[0]:
                        v = a[1] if a[1] else None
                        args.append({"key": a[0], "value": v})
                else:
                    args.append({"key": arg, "value": None})
    return args

uri_query = ArrayType(StructType([StructField('key', StringType(), nullable=True),
                                  StructField('value', StringType(), nullable=True)]))

udf_parse_query = udf(lambda args: parse_query(args), uri_query)

df = df.withColumn("cs_uri_query", udf_parse_query(df["cs_uri_query"]))

Кто-нибудь смог мне открыть глаза с помощью удивительного решения?

1 Ответ

1 голос
/ 06 марта 2020

Для Spark 2.4+ вы можете split на &, а затем использовать функцию transform для преобразования каждого элемента key=value в struct(key, value):

from pyspark.sql.functions import expr

df = spark.createDataFrame([("a=1&b=2&c=3",), ("d&e=&f=4",)], ["cs_uri_query"])

transform_expr = """transform(split(cs_uri_query, '&'),
                 x -> struct(split(x, '=')[0] as key, split(x, '=')[1] as value)
                 )
                 """

df.withColumn("cs_uri_query", expr(transform_expr)).show(truncate=False)

#+------------------------+
#|cs_uri_query            |
#+------------------------+
#|[[a, 1], [b, 2], [c, 3]]|
#|[[d,], [e, ], [f, 4]]   |
#+------------------------+

РЕДАКТИРОВАТЬ

Если вы хотите отфильтровать ключи, которые являются нулевыми или пустыми, вы можете использовать filter вместе с приведенным выше выражением преобразования:

transform_expr = """filter(transform(split(cs_uri_query, '&'),
                                     x -> struct(split(x, '=')[0] as key, split(x, '=')[1] as value)
                           ),
                           x -> ifnull(x.key, '') <> ''
                    )
                 """
...