KStream-KTable LeftJoin, Присоединение произошло, когда KTable загружен не полностью - PullRequest
0 голосов
/ 28 апреля 2020

Я пытаюсь использовать KStream-KTable leftJoin для обогащения элемента из topi c A с помощью Topi c B. Topi c A - это мой KStream, а topi c B - это мой KTtable, который имеет около 23M записей. Ключи из обеих тем не объединены, поэтому я должен преобразовать KStream (topi c B) в KTable с помощью редуктора.

Вот мой код:

KTable<String, String> ktable = streamsBuilder
     .stream("TopicB", Consumed.withTimestampExtractor(new customTimestampsExtractor()))
     .filter((key, value) -> {...})
     .transform(new KeyTransformer()) // generate new key
     .groupByKey()
     .reduce((aggValue, newValue) -> {...});

streamBuilder
     .stream("TopicA")
     .filter((key, value) -> {...})
     .transform(...)
     .leftJoin(ktable, new ValueJoiner({...}))
     .transform(...)
     .to("result")

1) KTable инициализация идет медленно. (около 2000 мсг / с), это нормально? Мой топи c имеет только 1 раздел. Есть ли способ улучшить производительность? Я попытался установить следующее, чтобы уменьшить c пропускную способность записи, но, похоже, не сильно улучшилось.

CACHE_MAX_BYTES_BUFFERING_CONFIG = 10 * 1024 * 1024
COMMIT_INTERVAL_MS_CONFIG = 15 * 1000

2) Объединение происходит, когда KTable не завершает загрузку из Topi c B. Вот смещение, когда происходит соединение (CURRENT-OFFSET / LOG-END-OFFSET)

   Topic A: 32725/32726 (Lag 1)
   Topic B: 1818686/23190390 (Lag 21371704)

Я проверил отметку времени записи Topi c A, которая не удалась, это запись за 4 дня a go, а последняя обрабатываемая запись Topi c B составляет 6 дней a go. Насколько я понимаю, запись процесса kstream на основе метки времени, я не понимаю, почему в моем случае KStream (Topi c A) не дождался полной загрузки KTable (Topi c B) до момента, когда * 440 * 4 дня, чтобы инициировать соединение.

Я также пытался установить возврат экстрактора метки времени на 0, но он также не работает.

Обновлено: при установке метки времени на 0 я получаю следующую ошибку:

Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerID are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. 

Я также пытался установить max.task.idle.ms на> 0 (3 секунды и 30 минут) ), но все равно получаю ту же ошибку.

Обновлено: я исправил ошибку «UnknownProducerIdException», установив customTimestampsExtractor равным 6 дням go, который все еще раньше, чем запись из Topi c A. Я думаю (не уверен), что установка 0 удержания триггера в журнале изменений, вызвавшем эту ошибку. Тем не менее, соединение все еще не работает там, где это происходит до того, как ktable завершит загрузку. Почему это так?

Я использую Kafka Streams 2.3.0.

Я что-то здесь не так делаю? Большое спасибо.

1 Ответ

1 голос
/ 28 апреля 2020

1. инициализация KTable идет медленно. (около 2000 мсг / с), это нормально?

Это зависит от вашей сети, и я думаю, что ограничение - это скорость потребления TopicB, две конфигурации CACHE_MAX_BYTES_BUFFERING_CONFIG и COMMIT_INTERVAL_MS_CONFIG, которые вы используете для выберите компромисс между тем, сколько выходных данных KTable вы хотите произвести (потому что журнал изменений KTable является потоком ревизий), и какую задержку вы принимаете, когда вы обновляете KTable до базового topi c и последующего процессора. Подробно рассмотрим конфигурацию кэширования Kafka Streams для хранилища состояний и этого блога part Tables, Not Triggers.

Я думаю, что это хороший способ увеличить скорость потребления TopicB это добавить еще раздел.

KStream.leftJoin(KTable,...) - это всегда поиск по таблице, он всегда объединяет текущую запись потока с последней обновленной записью в KTable, он не будет учитывать время потока при принятии решения, присоединяться или нет. Если вы хотите учесть время потока при присоединении, взгляните на KStream-KStream join .

В вашем случае это отставание составляет TopicB, , это не значит, что KTable загружен не полностью . Ваш KTable не полностью загружен, когда он находится в процессе восстановления состояния, когда он читается из основного списка изменений topi c из KTable, чтобы восстановить текущее состояние перед фактическим запуском вашего потокового приложения, на всякий случай вы не сможете выполнить соединение, потому что поток приложение не работает, пока состояние полностью не восстановится.

...