Я получил таблицу Spark df
со следующей схемой:
root
|-- sample: string (nullable = true)
|-- zygocity: integer (nullable = true)
|-- transcript_id: string (nullable = true)
|-- gene_id: string (nullable = true)
|-- tssStart: long (nullable = true)
|-- variant.contigName: string (nullable = true)
|-- variant.start: long (nullable = true)
|-- variant.end: long (nullable = true)
|-- region_of_interest.contigName: string (nullable = true)
|-- region_of_interest.start: long (nullable = true)
|-- region_of_interest.end: long (nullable = true)
|-- region_of_interest.strand: string (nullable = true)
Теперь я хотел бы сгруппировать таблицу и агрегировать с Pandas UDF (spark_window_overlap
). window_overlap
- это функция, которая принимает Pandas фрейм данных со структурой df
.
window_overlap_output_schema = t.StructType([
t.StructField("counts", t.ArrayType(t.LongType())),
t.StructField("starts", t.ArrayType(t.LongType())),
t.StructField("ends", t.ArrayType(t.LongType())),
])
spark_window_overlap = f.pandas_udf(lambda df: window_overlap(df), window_overlap_output_schema, f.PandasUDFType.GROUPED_AGG)
df.groupby(
"sample",
"zygocity", # zygocity
"gene_id",
"transcript_id",
"tssStart",
"`region_of_interest.contigName`",
"`region_of_interest.start`",
"`region_of_interest.end`",
"`region_of_interest.strand`",
).agg(spark_window_overlap(*[f.expr("`" + c + "`") for c in df.columns]))
Однако я всегда получаю следующую ошибку:
NotImplementedError: Invalid return type with grouped aggregate Pandas UDFs: StructType(List(StructField(counts,ArrayType(LongType,true),true),StructField(starts,ArrayType(LongType,true),true),StructField(ends,ArrayType(LongType,true),true))) is not supported
Что такое Я делаю не так? Это проще с PySpark 3.0?