Я создаю ниже KafkaDirectStream.
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
Затем сохраняем значения как:
val lines = messages.map(_.value)
Затем прекращаем потоковый контекст, когда у меня нет дальнейшего смещения для потребления следующим образом:
lines.foreachRDD(rdd => {
if(rdd.isEmpty()) {
messages.stop()
ssc.stop(false)
} else {
}
})
Затем я печатаю lines
следующим образом:
lines.print()
Затем я запускаю поток как:
ssc.start()
Он работает нормально.Он читает rdds и печатает топ-10, останавливает поток сообщений и останавливает потоковый контекст.Но затем, когда я выполняю ту же строку lines.print()
, она выдает исключение, говорящее, что не может делать новые входы, преобразовывать или выводить после остановки streamingContext.
Как мне достичь своей цели?Я запускаю его в спарк-оболочке, а не в двоичном формате (обязательное требование).
Вот что я на самом деле хочу достичь:
1) Использовать все записи json из темы kafka.
2) Прекратить получать дальнейшие записи (Гарантируется, что после потребления новые записи не будут добавлены в тему Кафки, поэтому не стоит продолжать обрабатывать никаких записей.)
3) Выполните некоторую предварительную обработку, извлекая некоторые поля из полей JSON.
4) Выполните дальнейшие операции с предварительно обработанными данными.
5) Готово.