KafkaStreams не выключается - PullRequest
       0

KafkaStreams не выключается

0 голосов
/ 12 сентября 2018

Если я создаю и запускаю экземпляр KafkaStream, а затем при вызове перехватчика shutdown .close () ничего не происходит - запись в журнал указывает, что один существующий StreamThread вошел в состояние PENDING_SHUTDOWN, но затем просто остается таким навсегда.Я попробовал все без удачи - прочитал исходный код потоков Kafka, чтобы увидеть, что он делает во время выключения, но на мой взгляд код подразумевает, что StreamThread, если он находится в рабочем состоянии, никогда не остановится (что не может быть правильным- Я не видел никаких ошибок такого рода, зарегистрированных в Kafka JIRA).

Вот соответствующий код моего простого приложения KafkaStream (scala):

val props: Properties = {
  val p = new Properties()
  p.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
  p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  p.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1")
 p
}


implicit val produced = Produced.`with`(new StringSerde(), new StringSerde())

val builder: StreamsBuilder = new StreamsBuilder()
val in: KStream[String, String] = builder.stream[String, String]("input-topic")

in.map((k,v) =>{ 
         println("Consumed and transforming value")
         (k,s"$v_transformed") 
     }).to("output-topic")

val streams: KafkaStreams = new KafkaStreams(builder.build(), props)

streams.start()

sys.addShutdownHook(streams.stop())

Как вы можетевидите, это просто чтение из одной темы, небольшое изменение значения из использованной записи и запись его в другую тему.

Поток запущен, и приложение вызовет stop () при отправке SIGINTк нему.

Когда я ^ C, чтобы завершить процесс, я вижу, как Кафка записывает в журнал сообщение о том, что StreamThread-1 переводится в PENDING_SHUTDOWN, и это так далеко, как только может.В конечном итоге он должен (в течение нескольких секунд) достичь состояния NOT_RUNNING, но никогда не достигнет этого, и он продолжает выводить мой оператор println для каждой записи, которую он продолжает читать из темы ввода.

Что я здесь не так делаю?

ОБНОВЛЕНИЕ: согласно предложению от комментатора, я попытался вызвать close () с тайм-аутом 60 секунд, и в итоге получил это, но по-прежнему без выключения: -

[shutdownHook1]ИНФОРМАЦИЯ o.apache.kafka.streams.KafkaStreams - stream-client [-] Потоковый клиент не может полностью остановиться в течение времени ожидания

...