Я реализовал потребитель Kafka в PySpark (Spark 2.2.1)
Я использую технологию структурированной потоковой передачи.
Каждая принимающая партия содержит сообщения, в которых столбец значений относится к разным типам (различаются по ключу), а его схема отличается.
Теперь я хочу запустить другую логику для столбца значения в соответствии с ключом столбца сообщения.
Как я могу сгруппировать (по ключу) полученные сообщения и запустить определенную логику для каждой группы в PySpark?
Вот фрагмент из моего кода, который работает, когда все сообщения одного типа:
lines = spark.readStream.format("kafka")...
schema_struct = get_schema_struct(...)
udf_identical_func = udf(
lambda k,msg: do_something_identical_to_all_types(k,msg)), schema_struct
)
res = lines.select(udf_identical_func("key","value"))
udf_specific_toEach_type_func = udf(
lambda k,msg: do_something_specific_to_each_type(k,msg)), schema_struct
)
res= res.select(udf_specific_toEach_type_func)
q = res.writeStream....
q.awaitTermination()
Спасибо за любую помощь