У меня есть цикл for, создающий kafka DStreams путем перебора списка названий тем. Затем я хочу применить функцию к каждому СДР, но мне нужно передать имя темы в функцию. Я вижу, что внутри функции доступно только последнее значение темы (topic3). Я так понимаю, потому что foreachRDD лениво выполняется. Есть ли способ пройти в названии темы? Я должен выполнить это без использования структурированной потоковой передачи.
topics = ["topic1", "topic2", "topic3"]
ssc = StreamingContext(spark_context, 5)
def process_topic(rdd, topic_value):
if not rdd.isEmpty():
print "topic : "+topic_value
df = rdd.toDF().show()
for topic in topics:
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],{'metadata.broker.list': 'broker_url', 'auto.offset.reset': 'smallest'})
lines = directKafkaStream.map(lambda v: json.loads(v[1]))
lines.foreachRDD(lambda x: process_topic(x, topic))
ssc.start()
time.sleep(10)
ssc.stop()
Вывод выглядит так:
"topic3"
(дф для темы1)
"topic3"
(дф для темы2)
"topic3"
(дф для темы3)
Хотелось бы увидеть:
"topic1"
(дф для темы1)
"topic2"
(df для theme2)
"topic3"
(df для theme3)