Понимание max.task.idle.ms в Kafka Stream для соединения KStream-KTable - PullRequest
0 голосов
/ 21 мая 2019

Мне нужна помощь в понимании поведения потока Kafka, когда max.task.idle.ms используется в Kafka 2.2.

У меня есть соединение KStream-KTable, где KStream был изменен:

KStream stream1 = builder.stream("topic1", Consumed.with(myTimeExtractor));
KStream stream2 = builder.stream("topic2", Consumed.with(myTimeExtractor));

KTable table = stream1
       .groupByKey()
       .aggregate(myInitializer, myAggregator, Materialized.as("myStore"))

stream2.selectKey((k,v)->v)
       .through("rekeyedTopic")
       .join(table, myValueJoiner)
       .to("enrichedTopic");

У всех тем есть 10 разделов, и для тестирования я установил max.task.idle.ms на 2 минуты. myTimeExtractor обновляет время события сообщений, только если они помечены как «снимок»: каждому сообщению моментального снимка в stream1 присваивается время события, равное некоторой константе T, сообщениям в stream2 присваивается время события T + 1.

При каждом вызове KafkaStreams # start в каждой из theme1 и topic2 присутствуют 200 сообщений, все помечены как «снимок», и после этого сообщение не добавляется. Я вижу, что в течение секунды или около того myStore и rekeyedTopic заполняются. Поскольку время событий сообщений в таблице меньше, чем время событий сообщений в потоке, я понимаю (из чтения https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization)), что я должен увидеть результат соединения (в enrichedTopic) вскоре после myStore и rekeyedTopic заполняются. Фактически я должен иметь возможность сначала заполнить rekeyedTopic, и до тех пор, пока myStore заполняется менее чем через 2 минуты, соединение все равно будет давать ожидаемый результат.

Это не то, что происходит. Что происходит, так это то, что myStore и rekeyedTopic заполняются в течение первой секунды или около того, затем в течение 2 минут ничего не происходит, и только тогда enrichedTopic заполняется ожидаемыми сообщениями.

Я не понимаю, почему наступает пауза в 2 минуты, прежде чем enrichedTopic заполняется, поскольку все «готово» задолго до этого. Чего мне не хватает?

1 Ответ

0 голосов
/ 22 мая 2019

на основании документации, в которой указано:

max.task.idle.ms - Максимальное количество времени, в течение которого потоковая задача будет простаивать, если не все его буферы разделов содержат записи, чтобы избежать возможного выхода из строя обработка записей в нескольких входных потоках.

Я бы сказал, что это возможно из-за того, что некоторые буферы разделов НЕ содержат записей, поэтому в основном ожидают, чтобы избежать обработки не по порядку до определенного времени, которое вы настроили для свойства.

...