TL; DR pyspark.sql.Row
объекты не могут быть возвращены из udf
.
Известная форма :
Если схема хорошо определена и в результате вы не получите array<struct<...>>
, вам следует использовать стандартный tuple
.В этом случае базовая функция синтаксического анализа может быть реализована следующим образом *:
from typing import List, Tuple
def extract_fields(row: str) -> Tuple[str]:
# Here we assume that each element has the expected number of fields
# In practice you should validate the data
return tuple(row.split(","))
и предоставить схему вывода для udf
:
schema = ("array<struct<"
"field1: string, field2: string, field3: string, field4: string"
">>")
@sqlfn.udf(schema)
def extract_multile_fields(rows: List[str]) -> List[Tuple[str]]:
return [extract_fields(row) for row in rows]
result = df.select(extract_multile_fields("x"))
result.show(truncate=False)
+--------------------------------------------------------------------------+
|extract_multile_fields(x) |
+--------------------------------------------------------------------------+
|[[field1, field2, field3, field4], [field1, field2, field3, field4]]|
+--------------------------------------------------------------------------+
Есликоличество полей велико, вы можете предпочесть конструировать схему программно, используя строку DDL:
from pyspark.sql.types import ArrayType, StringType, StructField, StructType
schema = ArrayType(StructType(
[StructField(f"field$i", StringType()) for i in range(1, 5)]
))
В Spark 2.4 или новее вы также можете напрямую использовать встроенные функции:
from pyspark.sql.column import Column
def extract_multile_fields_(col: str) -> Column:
return sqlfn.expr(f"""transform(
-- Here we parameterize input with {{}}
transform(`{col}`, s -> split(s, ',')),
-- Adjust the list of fields and cast if necessary
a -> struct(
a[0] as field1, a[1] as field2, a[2] as field3, a[3] as field4)
)""")
result = df.select(extract_multile_fields_("x").alias("records"))
result.show(truncate=False)
+--------------------------------------------------------------------------+
|records |
+--------------------------------------------------------------------------+
|[[field1, field2, field3, field4], [field1, field2, field3, field4]]|
+--------------------------------------------------------------------------+
Неизвестная форма :
Если форма данных неизвестна, то array<struct<...>>
не является правильным выбором DataType
.В этом случае вы можете попытаться использовать array<map<..., ...>>
, однако для этого необходимо, чтобы все значения были одного типа:
from typing import Dict
def extract_fields(row: str) -> Dict[str, str]:
return ... # TODO: Implement the logic
@sqlfn.udf("array<map<string, string>>")
def extract_multile_fields(rows: List[str]) -> List[Dict[str, str]]:
return [extract_fields(row) for row in rows]
* Обратите внимание, что все записи должны иметь одинаковую форму,Если некоторые поля отсутствуют.Вы должны заполнить пробелы с None
.