PySpark: PandasUDFType.GROUPED_MAP с фреймом данных в качестве входных данных? - PullRequest
0 голосов
/ 03 апреля 2020

Я получил таблицу Spark df со следующей схемой:

root
 |-- sample: string (nullable = true)
 |-- zygocity: integer (nullable = true)
 |-- transcript_id: string (nullable = true)
 |-- gene_id: string (nullable = true)
 |-- tssStart: long (nullable = true)
 |-- variant.contigName: string (nullable = true)
 |-- variant.start: long (nullable = true)
 |-- variant.end: long (nullable = true)
 |-- region_of_interest.contigName: string (nullable = true)
 |-- region_of_interest.start: long (nullable = true)
 |-- region_of_interest.end: long (nullable = true)
 |-- region_of_interest.strand: string (nullable = true)

Теперь я хотел бы сгруппировать таблицу и агрегировать с Pandas UDF (spark_window_overlap). window_overlap - это функция, которая принимает Pandas фрейм данных со структурой df.

window_overlap_output_schema = t.StructType([
    t.StructField("counts", t.ArrayType(t.LongType())),
    t.StructField("starts", t.ArrayType(t.LongType())),
    t.StructField("ends", t.ArrayType(t.LongType())),
])

spark_window_overlap = f.pandas_udf(lambda df: window_overlap(df), window_overlap_output_schema, f.PandasUDFType.GROUPED_AGG)

df.groupby(
    "sample",
    "zygocity", # zygocity
    "gene_id",
    "transcript_id",
    "tssStart",
    "`region_of_interest.contigName`",
    "`region_of_interest.start`",
    "`region_of_interest.end`",
    "`region_of_interest.strand`",
).agg(spark_window_overlap(*[f.expr("`" + c + "`") for c in df.columns]))

Однако я всегда получаю следующую ошибку:

NotImplementedError: Invalid  return type with grouped aggregate Pandas UDFs: StructType(List(StructField(counts,ArrayType(LongType,true),true),StructField(starts,ArrayType(LongType,true),true),StructField(ends,ArrayType(LongType,true),true))) is not supported

Что такое Я делаю не так? Это проще с PySpark 3.0?

...