Мне нужна помощь в понимании поведения потока Kafka, когда max.task.idle.ms используется в Kafka 2.2.
У меня есть соединение KStream-KTable, где KStream был изменен:
KStream stream1 = builder.stream("topic1", Consumed.with(myTimeExtractor));
KStream stream2 = builder.stream("topic2", Consumed.with(myTimeExtractor));
KTable table = stream1
.groupByKey()
.aggregate(myInitializer, myAggregator, Materialized.as("myStore"))
stream2.selectKey((k,v)->v)
.through("rekeyedTopic")
.join(table, myValueJoiner)
.to("enrichedTopic");
У всех тем есть 10 разделов, и для тестирования я установил max.task.idle.ms на 2 минуты. myTimeExtractor обновляет время события сообщений, только если они помечены как «снимок»: каждому сообщению моментального снимка в stream1 присваивается время события, равное некоторой константе T, сообщениям в stream2 присваивается время события T + 1.
При каждом вызове KafkaStreams # start в каждой из theme1 и topic2 присутствуют 200 сообщений, все помечены как «снимок», и после этого сообщение не добавляется. Я вижу, что в течение секунды или около того myStore и rekeyedTopic заполняются. Поскольку время событий сообщений в таблице меньше, чем время событий сообщений в потоке, я понимаю (из чтения https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization)), что я должен увидеть результат соединения (в enrichedTopic) вскоре после myStore и rekeyedTopic заполняются. Фактически я должен иметь возможность сначала заполнить rekeyedTopic, и до тех пор, пока myStore заполняется менее чем через 2 минуты, соединение все равно будет давать ожидаемый результат.
Это не то, что происходит. Что происходит, так это то, что myStore и rekeyedTopic заполняются в течение первой секунды или около того, затем в течение 2 минут ничего не происходит, и только тогда enrichedTopic заполняется ожидаемыми сообщениями.
Я не понимаю, почему наступает пауза в 2 минуты, прежде чем enrichedTopic заполняется, поскольку все «готово» задолго до этого. Чего мне не хватает?