Я пытаюсь использовать KStream-KTable leftJoin для обогащения элемента из topi c A с помощью Topi c B. Topi c A - это мой KStream, а topi c B - это мой KTtable, который имеет около 23M записей. Ключи из обеих тем не объединены, поэтому я должен преобразовать KStream (topi c B) в KTable с помощью редуктора.
Вот мой код:
KTable<String, String> ktable = streamsBuilder
.stream("TopicB", Consumed.withTimestampExtractor(new customTimestampsExtractor()))
.filter((key, value) -> {...})
.transform(new KeyTransformer()) // generate new key
.groupByKey()
.reduce((aggValue, newValue) -> {...});
streamBuilder
.stream("TopicA")
.filter((key, value) -> {...})
.transform(...)
.leftJoin(ktable, new ValueJoiner({...}))
.transform(...)
.to("result")
1) KTable инициализация идет медленно. (около 2000 мсг / с), это нормально? Мой топи c имеет только 1 раздел. Есть ли способ улучшить производительность? Я попытался установить следующее, чтобы уменьшить c пропускную способность записи, но, похоже, не сильно улучшилось.
CACHE_MAX_BYTES_BUFFERING_CONFIG = 10 * 1024 * 1024
COMMIT_INTERVAL_MS_CONFIG = 15 * 1000
2) Объединение происходит, когда KTable не завершает загрузку из Topi c B. Вот смещение, когда происходит соединение (CURRENT-OFFSET / LOG-END-OFFSET)
Topic A: 32725/32726 (Lag 1)
Topic B: 1818686/23190390 (Lag 21371704)
Я проверил отметку времени записи Topi c A, которая не удалась, это запись за 4 дня a go, а последняя обрабатываемая запись Topi c B составляет 6 дней a go. Насколько я понимаю, запись процесса kstream на основе метки времени, я не понимаю, почему в моем случае KStream (Topi c A) не дождался полной загрузки KTable (Topi c B) до момента, когда * 440 * 4 дня, чтобы инициировать соединение.
Я также пытался установить возврат экстрактора метки времени на 0, но он также не работает.
Обновлено: при установке метки времени на 0 я получаю следующую ошибку:
Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerID are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.
Я также пытался установить max.task.idle.ms на> 0 (3 секунды и 30 минут) ), но все равно получаю ту же ошибку.
Обновлено: я исправил ошибку «UnknownProducerIdException», установив customTimestampsExtractor равным 6 дням go, который все еще раньше, чем запись из Topi c A. Я думаю (не уверен), что установка 0 удержания триггера в журнале изменений, вызвавшем эту ошибку. Тем не менее, соединение все еще не работает там, где это происходит до того, как ktable завершит загрузку. Почему это так?
Я использую Kafka Streams 2.3.0.
Я что-то здесь не так делаю? Большое спасибо.