Структурированная потоковая передача - запуск другой функции по ключу для сообщения в одном пакете - PullRequest
0 голосов
/ 13 мая 2018

Я реализовал потребитель Kafka в PySpark (Spark 2.2.1)

Я использую технологию структурированной потоковой передачи.

Каждая принимающая партия содержит сообщения, в которых столбец значений относится к разным типам (различаются по ключу), а его схема отличается.

Теперь я хочу запустить другую логику для столбца значения в соответствии с ключом столбца сообщения.

Как я могу сгруппировать (по ключу) полученные сообщения и запустить определенную логику для каждой группы в PySpark?

Вот фрагмент из моего кода, который работает, когда все сообщения одного типа:

lines = spark.readStream.format("kafka")...

schema_struct = get_schema_struct(...)

udf_identical_func = udf(
    lambda k,msg: do_something_identical_to_all_types(k,msg)), schema_struct
)

res = lines.select(udf_identical_func("key","value"))

udf_specific_toEach_type_func = udf(
    lambda k,msg: do_something_specific_to_each_type(k,msg)), schema_struct
)

res= res.select(udf_specific_toEach_type_func)
q = res.writeStream....
q.awaitTermination()

Спасибо за любую помощь

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...