Если я создаю и запускаю экземпляр KafkaStream, а затем при вызове перехватчика shutdown .close () ничего не происходит - запись в журнал указывает, что один существующий StreamThread вошел в состояние PENDING_SHUTDOWN, но затем просто остается таким навсегда.Я попробовал все без удачи - прочитал исходный код потоков Kafka, чтобы увидеть, что он делает во время выключения, но на мой взгляд код подразумевает, что StreamThread, если он находится в рабочем состоянии, никогда не остановится (что не может быть правильным- Я не видел никаких ошибок такого рода, зарегистрированных в Kafka JIRA).
Вот соответствующий код моего простого приложения KafkaStream (scala):
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1")
p
}
implicit val produced = Produced.`with`(new StringSerde(), new StringSerde())
val builder: StreamsBuilder = new StreamsBuilder()
val in: KStream[String, String] = builder.stream[String, String]("input-topic")
in.map((k,v) =>{
println("Consumed and transforming value")
(k,s"$v_transformed")
}).to("output-topic")
val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
streams.start()
sys.addShutdownHook(streams.stop())
Как вы можетевидите, это просто чтение из одной темы, небольшое изменение значения из использованной записи и запись его в другую тему.
Поток запущен, и приложение вызовет stop () при отправке SIGINTк нему.
Когда я ^ C, чтобы завершить процесс, я вижу, как Кафка записывает в журнал сообщение о том, что StreamThread-1 переводится в PENDING_SHUTDOWN, и это так далеко, как только может.В конечном итоге он должен (в течение нескольких секунд) достичь состояния NOT_RUNNING, но никогда не достигнет этого, и он продолжает выводить мой оператор println для каждой записи, которую он продолжает читать из темы ввода.
Что я здесь не так делаю?
ОБНОВЛЕНИЕ: согласно предложению от комментатора, я попытался вызвать close () с тайм-аутом 60 секунд, и в итоге получил это, но по-прежнему без выключения: -
[shutdownHook1]ИНФОРМАЦИЯ o.apache.kafka.streams.KafkaStreams - stream-client [-] Потоковый клиент не может полностью остановиться в течение времени ожидания