PySpark DataFrame меняет столбец строки на массив перед использованием разнесения - PullRequest
0 голосов
/ 27 ноября 2018

У меня есть столбец с именем event_data в формате json в моем искровом DataFrame, после прочтения его с помощью from_json я получаю следующую схему:

root
 |-- user_id: string (nullable = true)
 |-- event_data: struct (nullable = true)
 |    |-- af_content_id: string (nullable = true)
 |    |-- af_currency: string (nullable = true)
 |    |-- af_order_id: long (nullable = true)

Мне нужно толькоaf_content_id из этой колонки.Этот атрибут может иметь различные форматы:

Строка Целое число Список Int и Str.например, ['ghhjj23','123546',12356] Нет (иногда event_data не содержит af_content_id)

Я хочу использовать функцию explode для возврата новой строки для каждого элемента в af_content_id, когда онформата Список .Но, когда я его применяю, я получаю сообщение об ошибке:

from pyspark.sql.functions import explode

def get_content_id(column):
    return column.af_content_id

df_transf_1 = df_transf_1.withColumn(
    "products_basket", 
    get_content_id(df_transf_1.event_data)
)

df_transf_1 = df_transf_1.withColumn(
    "product_id",
    explode(df_transf_1.products_basket)
)

не может разрешить 'взорвать (products_basket)' из-за несоответствия типов данных: ввод в функцию разнесения должен быть массив или тип карты, а не StringType;

Я знаю причину, это из-за разных типов, которые может содержать поле af_content_id, но я не знаю, как его решить.Использование pyspark.sql.functions.array() непосредственно для столбца не работает, потому что он становится массивом массива, и разнесение не приведет к ожидаемому результату.

Пример кода для воспроизведения шага, на котором я застрял:

import pandas as pd

arr = [
    ['b5ad805c-f295-4852-82fc-961a88',12732936],
    ['0FD6955D-484C-4FC8-8C3F-DA7D28',['Gklb38','123655']],
    ['0E3D17EA-BEEF-4931-8104','12909841'],
    ['CC2877D0-A15C-4C0A-AD65-762A35C1',[12645715, 12909837, 12909837]]
]

df = pd.DataFrame(arr, columns = ['user_id','products_basket'])

df = df[['user_id','products_basket']].astype(str)
df_transf_1 = spark.createDataFrame(df)

Я ищу способ преобразования products_basket в один только возможный формат: Array , чтобы при применении explode он содержал один идентификаторза ряд.

1 Ответ

0 голосов
/ 27 ноября 2018

Если вы начинаете с DataFrame, например:

df_transf_1.show(truncate=False)
#+--------------------------------+------------------------------+
#|user_id                         |products_basket               |
#+--------------------------------+------------------------------+
#|b5ad805c-f295-4852-82fc-961a88  |12732936                      |
#|0FD6955D-484C-4FC8-8C3F-DA7D28  |['Gklb38', '123655']          |
#|0E3D17EA-BEEF-4931-8104         |12909841                      |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
#+--------------------------------+------------------------------+

, где столбец products_basket равен StringType:

df.printSchema()
#root
# |-- user_id: string (nullable = true)
# |-- products_basket: string (nullable = true)

Вы не можете позвонить explodeна products_basket, потому что это не массив или карта.

Один из обходных путей - убрать все начальные / конечные квадратные скобки, а затем разбить строку на ", " (запятая с последующим пробелом).Это преобразует строку в массив строк.

from pyspark.sql.functions import col, regexp_replace, split
df_transf_new= df_transf_1.withColumn(
    "products_basket",
    split(regexp_replace(col("products_basket"), r"(^\[)|(\]$)|(')", ""), ", ")
)

df_transf_new.show(truncate=False)
#+--------------------------------+------------------------------+
#|user_id                         |products_basket               |
#+--------------------------------+------------------------------+
#|b5ad805c-f295-4852-82fc-961a88  |[12732936]                    |
#|0FD6955D-484C-4FC8-8C3F-DA7D28  |[Gklb38, 123655]              |
#|0E3D17EA-BEEF-4931-8104         |[12909841]                    |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
#+--------------------------------+------------------------------+

Шаблон регулярного выражения соответствует любому из следующего:

  • (^\[): открывающая квадратная скобка в началестроки
  • (\]$): закрывающая квадратная скобка в конце строки
  • ('): любая одинарная кавычка (потому что строки указаны в кавычках)

и заменяет их пустой строкой.

Предполагается, что ваши данные не содержат необходимых одинарных кавычек или квадратных скобок внутри product_basket.

После split схема нового DataFrame выглядит следующим образом:

df_transf_new.printSchema()
#root
# |-- user_id: string (nullable = true)
# |-- products_basket: array (nullable = true)
# |    |-- element: string (containsNull = true)

Теперь вы можете звонить explode:

from pyspark.sql.functions import explode
df_transf_new.withColumn("product_id", explode("products_basket")).show(truncate=False)
#+--------------------------------+------------------------------+----------+
#|user_id                         |products_basket               |product_id|
#+--------------------------------+------------------------------+----------+
#|b5ad805c-f295-4852-82fc-961a88  |[12732936]                    |12732936  |
#|0FD6955D-484C-4FC8-8C3F-DA7D28  |[Gklb38, 123655]              |Gklb38    |
#|0FD6955D-484C-4FC8-8C3F-DA7D28  |[Gklb38, 123655]              |123655    |
#|0E3D17EA-BEEF-4931-8104         |[12909841]                    |12909841  |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12645715  |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837  |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837  |
#+--------------------------------+------------------------------+----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...