Фильтровать поток кафки по темам - PullRequest
0 голосов
/ 22 апреля 2019

Фильтр Kafka из нескольких тем по темам.

kafka_stream = KafkaUtils.createStream(ssc, zookeeper_server, groupId='group-0', topics={'topic1': 1,'topic2': 1}, valueDecoder=lambda v: json.loads(v))

Мне нравится фильтровать поток по темам theme1 и topic2, скажем, kafka_stream_topic1 и kafka_steeam_topic2, а затем обрабатывать его отдельно.

 kafka_stream_topic1.foreachRDD(lambda rdd: rdd.foreach(process_func_for_topic1))
 kafka_stream_topic2.foreachRDD(lambda rdd: rdd.foreach(process_func_for_topic2))

Фильтровать поток по каждой теме, а затем обрабатывать отдельно.

...