Uisng pyspark.sql.types.StructType.fromJson()
вы можете динамически построить схему из json.
В соответствии с вашим требованием я изменил тип данных, используя для «col_e», вы можете изменить DataTypes на один или несколько столбцов в зависимости от вашего варианта использования.
df = spark.read.csv('test.csv',header=True,inferSchema=True)
fields = []
for f in json.loads(df.schema.json())["fields"]:
if f["name"] == "col_e":
fields.append(StructField("col_e", StringType(), True))
else:
fields.append(StructField.fromJson(f))
schema = StructType(fields)
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def many_cols_data(pdf):
pdf['col_e'] = "test"
return pdf
df.groupBy(
'col_a'
).apply(
many_cols_data
).show()
тест входного файла. csv
col_a,col_b,col_c,col_d,col_e
a,2,3,4,5
b,2,3,4,5
c,2,3,4,5
, что дает
+-----+-----+-----+-----+-----+
|col_a|col_b|col_c|col_d|col_e|
+-----+-----+-----+-----+-----+
| c| 2| 3| 4| test|
| b| 2| 3| 4| test|
| a| 2| 3| 4| test|
+-----+-----+-----+-----+-----+