как запустить foreachRDD с аргументами внутри цикла for - PullRequest
0 голосов
/ 23 марта 2019

У меня есть цикл for, создающий kafka DStreams путем перебора списка названий тем. Затем я хочу применить функцию к каждому СДР, но мне нужно передать имя темы в функцию. Я вижу, что внутри функции доступно только последнее значение темы (topic3). Я так понимаю, потому что foreachRDD лениво выполняется. Есть ли способ пройти в названии темы? Я должен выполнить это без использования структурированной потоковой передачи.

topics = ["topic1", "topic2", "topic3"]
ssc = StreamingContext(spark_context, 5)

def process_topic(rdd, topic_value):
    if not rdd.isEmpty():
        print "topic : "+topic_value
        df = rdd.toDF().show()

for topic in topics:
    directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],{'metadata.broker.list': 'broker_url', 'auto.offset.reset': 'smallest'})
    lines = directKafkaStream.map(lambda v: json.loads(v[1]))
    lines.foreachRDD(lambda x: process_topic(x, topic))

ssc.start()
time.sleep(10)
ssc.stop()

Вывод выглядит так:

"topic3"

(дф для темы1)

"topic3"

(дф для темы2)

"topic3"

(дф для темы3)

Хотелось бы увидеть:

"topic1"

(дф для темы1)

"topic2"

(df для theme2)

"topic3"

(df для theme3)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...