Как сохранить данные для последующей обработки после остановки DirectStream в SparkStreaming? - PullRequest
0 голосов
/ 09 июня 2018

Я создаю ниже KafkaDirectStream.

  val messages = KafkaUtils.createDirectStream[String, String](
         ssc,
         LocationStrategies.PreferConsistent,
         ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

Затем сохраняем значения как:

  val lines = messages.map(_.value)

Затем прекращаем потоковый контекст, когда у меня нет дальнейшего смещения для потребления следующим образом:

  lines.foreachRDD(rdd => {
      if(rdd.isEmpty()) {
        messages.stop()
        ssc.stop(false)
      } else {

      }
  })

Затем я печатаю lines следующим образом:

    lines.print()

Затем я запускаю поток как:

    ssc.start()

Он работает нормально.Он читает rdds и печатает топ-10, останавливает поток сообщений и останавливает потоковый контекст.Но затем, когда я выполняю ту же строку lines.print(), она выдает исключение, говорящее, что не может делать новые входы, преобразовывать или выводить после остановки streamingContext.

Как мне достичь своей цели?Я запускаю его в спарк-оболочке, а не в двоичном формате (обязательное требование).

Вот что я на самом деле хочу достичь:

1) Использовать все записи json из темы kafka.

2) Прекратить получать дальнейшие записи (Гарантируется, что после потребления новые записи не будут добавлены в тему Кафки, поэтому не стоит продолжать обрабатывать никаких записей.)

3) Выполните некоторую предварительную обработку, извлекая некоторые поля из полей JSON.

4) Выполните дальнейшие операции с предварительно обработанными данными.

5) Готово.

1 Ответ

0 голосов
/ 09 июня 2018

когда вы снова вызываете "lines.print ()", он пытается снова вызвать преобразование "messages.map (_. Value)".Как вы остановили контекст его провал.

Сохраните переменную lines, выполнив действие перед остановкой контекста.

...