Соединение по внешнему ключу KTable-KTable не создает все сообщения, когда темы имеют более одного раздела - PullRequest
6 голосов
/ 14 июля 2020

См. Обновление ниже, чтобы показать потенциальный обходной путь

Наше приложение использует 2 темы в виде KTables, выполняет левое соединение и выводит в topi c. Во время тестирования мы обнаружили, что это работает должным образом, когда наш вывод topi c имеет только 1 раздел. Когда мы увеличиваем количество разделов, мы замечаем, что количество сообщений, которые выводятся на вывод topi c, уменьшается.

Мы проверили эту теорию с несколькими конфигурациями разделов перед запуском приложения. С 1 разделом мы видим 100% сообщений. С 2 мы видим некоторые сообщения (менее 50%). При 10 мы почти ничего не видим (менее 10%).

Поскольку нам осталось присоединиться, каждое отдельное сообщение, полученное от Topi c 1, должно быть записано в наш вывод topi c, но мы обнаруживаем, что этого не происходит. Кажется, что сообщения застревают в "промежуточных" темах, созданных из соединения внешнего ключа Ktables, но сообщений об ошибках нет.

Любая помощь будет принята с благодарностью!

Сервис. java

@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {

    return (topicOne, topicTwo) ->
            topicOne
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();
}

build.gradle

plugins {
    id 'org.springframework.boot' version '2.3.1.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}

...

ext {
    set('springCloudVersion', "Hoxton.SR6")
}

...

implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'

Примечание. Мы исключаем организацию apache .kafka зависимости из-за ошибки в версиях, включенных в spring-cloud-stream

application.yml

spring:
  application:
    name: app-name
    stream:
      bindings:
        process-in-0:
          destination: topic1
          group: ${spring.application.name}
        process-in-1:
          destination: topic2
          group: ${spring.application.name}
        process-out-0:
          destination: outputTopic
      kafka:
        streams:
          binder:
            applicationId: ${spring.application.name}
            brokers: ${KAFKA_BROKERS}
            configuration:
              commit.interval.ms: 1000
              producer:
                acks: all
                retries: 20
              default:
                key:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
                value:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            min-partition-count: 2

Сценарий тестирования:

В качестве конкретного примера, если я опубликую sh следующие 3 сообщения в Topi c 1:

{"fieldA": 1, "fieldB": 1},,{"fieldA": 1, "fieldB": 1}
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
{"fieldA": 4, "fieldB": 4},,{"fieldA": 4, "fieldB": 4}

Выходной topi c получит только 2 сообщения.

{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}

Что случилось с двумя другими? Кажется, что некоторые пары ключ / значение просто не могут быть записаны в выходной файл topi c. Повторная попытка этих "потерянных" сообщений тоже не работает.

Обновление:

Мне удалось заставить это функционировать должным образом, используя Topi c 1 как KStream вместо KTable и позвонив по номеру toTable() перед тем, как продолжить соединение KTable-KTable. Я до сих пор не понимаю, почему мое исходное решение не работает, но, надеюсь, этот обходной путь может пролить свет на реальную проблему.

@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {

    return (topicOne, topicTwo) ->
            topicOne
                    .map(...)
                    .toTable()
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();
}

Ответы [ 3 ]

1 голос
/ 03 августа 2020

Учитывая описание проблемы, кажется, что данные во входном поле KTable topi c (слева) неправильно разделены по ключу. Для однораздельного topi c, ну, есть только один раздел, и все данные идут в этот один раздел, и результат соединения завершен.

Однако для многораздельного ввода topi c, вам необходимо убедиться, что данные разделены по ключу, в противном случае две записи с одним и тем же ключом могут оказаться в разных разделах и, таким образом, соединение не будет выполнено (поскольку соединение выполняется для каждого раздела).

Обратите внимание, что даже если соединение по внешнему ключу не требует, чтобы обе входные темы были совместно разделены, все равно требуется, чтобы каждая входная topi c сама была разделена по своему ключу!

Если вы используете map().toTable() вы в основном запускаете внутреннее перераспределение данных, которое гарантирует, что данные будут разделены по ключу, и это решает проблему.

0 голосов
/ 14 июля 2020

Это странная проблема, я никогда не слышал о нескольких разделах вывода topi c, контролирующих частоту записи данных. Однако я знаю, что toStream() записывает данные в нисходящий поток только тогда, когда кеш заполнен, поэтому попробуйте установить cache.max.bytes.buffering = 0. Кроме того, KTable хранит только самую последнюю запись для каждого ключа, поэтому, если у вас есть несколько значений для одного и того же ключа, только последнее значение останется и будет записано ниже по течению.

0 голосов
/ 14 июля 2020

Выбор ключа на объединенных топах c может помочь. Конфигурация разделов тем должна быть такой же.

return (topicOne, topicTwo) ->
        topicOne
            .leftJoin(topicTwo,
                value -> MyOtherKey.newBuilder()
                    .setFieldA(value.getFieldA())
                    .setFieldB(value.getFieldB())
                    .build(),
                this::enrich)
            .toStream().selectKey((key, value) -> key);
...