Действия Spark Streaming Run для DStream асинхронно - PullRequest
0 голосов
/ 14 октября 2018

Я пишу программу для приема данных.Для чтения из Kafka в DStream разделите Dstrem на 3 потока и выполните действия для каждого из них:

val stream = createSparkStream(Globals.configs, ssc)
val s1 = stream.filter(<predicat1>)
val s2 = stream.filter(<predicat2>)
val s3 = stream.filter(<predicat3>)

//I'm looking for something like:
s1.forEachRddAsync(...
s2.forEachRddAsync(...
s3.forEachRddAsync(... 

Если возможно инициировать асинхронную отправку для всего DStream, а не для RDD.

1 Ответ

0 голосов
/ 14 октября 2018

DStream методы действия, хотя и блокируют, но не обрабатывают данные.Они регистрируют только DStream в качестве выходного потока.

После запуска StreamingContext обработка будет планироваться в соответствии с доступными ресурсами и, если это разрешено, обрабатываться без ограничения друг друга.

...