Pyspark Преобразование столбца списков в столбец вложенной структуры - PullRequest
0 голосов
/ 20 октября 2018

Я пытаюсь преобразовать некрасивый набор текстовых строк в репрезентативный фрейм данных PySpark.Я застрял на последнем этапе преобразования столбца, который содержит список строк, в столбец, который содержит вложенную структуру строк.Для каждой строки в списке я нормализую ее к тем же полям с пониманием словаря Python.Когда я пытаюсь затем преобразовать это через udf в столбце, это терпит неудачу.

Мой столбец 'records' содержит списки строк, которые похожи на эти ...

['field1, field2, field3, field4', 'field1, field2, field3, field4'..]

к счастью, структура строк хорошо определена, содержит строки и целые, поэтому у меня есть Pythonпонимание словаря, которое просто разбивает и присваивает имена.

def extract_fields(row: str) -> dict:
  fields = row.split(",")
  return { 'field1': fields[0], 'field2': fields[1], ...} 

это прекрасно работает на одной строке как преобразование в строку

from pyspark.sql import Row
Row(**extract_fields( sample_string))

Итак, я подумал, что мог бы использовать UDF, чтобы затем преобразовать столбец в столбец вложенногосостав.

nest = sqlfn.udf(lambda x: [Row(**extract_fields(row)) for row in x])

Обычно я бы добавил тип, возвращаемый для UDF, но я не могу понять, как указать массив строк.Я не получаю ошибку, пока не выполню позже.

Итак, теперь, когда я пытаюсь применить это к моему фрейму данных,

test = df.select(nest(df.records).alias('expanded')
test.show(5)

я получаю эту ошибку:

expected zero arguments for construction of ClassDict (for 
pyspark.sql.types._create_row)

Другие вопросы, которые я нашел, относились кэта ошибка, кажется, указывает на то, что в их словаре есть ошибка типа, но в моем случае мой словарь имеет тип string и integer.Я также попробовал на крошечном примере только с одним списком строк и получил тот же ответ.

Мой ожидаемый результат заключается в том, что новый столбец «расширен» будет столбцом со структурой вложенных строк, где отдельная строка в этом столбце будет выглядеть следующим образом:

Row(expanded = [Row(field1='x11', field2='x12',...), Row(field1='x21', 
field2='x22',....) ] )

Любой совет?

1 Ответ

0 голосов
/ 20 октября 2018

TL; DR pyspark.sql.Row объекты не могут быть возвращены из udf.

Известная форма :

Если схема хорошо определена и в результате вы не получите array<struct<...>>, вам следует использовать стандартный tuple.В этом случае базовая функция синтаксического анализа может быть реализована следующим образом *:

from typing import List, Tuple

def extract_fields(row: str) -> Tuple[str]:
    # Here we assume that each element has the  expected number of fields
    # In practice you should validate the data
    return tuple(row.split(","))

и предоставить схему вывода для udf:

schema = ("array<struct<"
          "field1: string, field2: string, field3: string, field4: string"
          ">>")

@sqlfn.udf(schema)
def extract_multile_fields(rows: List[str]) -> List[Tuple[str]]:
    return [extract_fields(row) for row in rows]

result = df.select(extract_multile_fields("x"))
result.show(truncate=False)
+--------------------------------------------------------------------------+
|extract_multile_fields(x)                                                 |
+--------------------------------------------------------------------------+
|[[field1,  field2,  field3,  field4], [field1,  field2,  field3,  field4]]|
+--------------------------------------------------------------------------+

Есликоличество полей велико, вы можете предпочесть конструировать схему программно, используя строку DDL:

from pyspark.sql.types import ArrayType, StringType, StructField, StructType

schema = ArrayType(StructType(
    [StructField(f"field$i", StringType()) for i in range(1, 5)]
))

В Spark 2.4 или новее вы также можете напрямую использовать встроенные функции:

from pyspark.sql.column import Column

def extract_multile_fields_(col: str) -> Column:
    return sqlfn.expr(f"""transform(
        -- Here we parameterize input with {{}}
        transform(`{col}`, s -> split(s, ',')),  
        -- Adjust the list of fields and cast if necessary 
        a -> struct(
            a[0] as field1, a[1] as field2, a[2] as field3, a[3] as field4)
    )""")


result = df.select(extract_multile_fields_("x").alias("records"))
result.show(truncate=False)
+--------------------------------------------------------------------------+
|records                                                                   |
+--------------------------------------------------------------------------+
|[[field1,  field2,  field3,  field4], [field1,  field2,  field3,  field4]]|
+--------------------------------------------------------------------------+

Неизвестная форма :

Если форма данных неизвестна, то array<struct<...>> не является правильным выбором DataType.В этом случае вы можете попытаться использовать array<map<..., ...>>, однако для этого необходимо, чтобы все значения были одного типа:

from typing import Dict

def extract_fields(row: str) -> Dict[str, str]:
    return ... # TODO: Implement the logic

@sqlfn.udf("array<map<string, string>>")
def extract_multile_fields(rows: List[str]) -> List[Dict[str, str]]:
    return [extract_fields(row) for row in rows]

* Обратите внимание, что все записи должны иметь одинаковую форму,Если некоторые поля отсутствуют.Вы должны заполнить пробелы с None.

...