См. Обновление ниже, чтобы показать потенциальный обходной путь
Наше приложение использует 2 темы в виде KTables, выполняет левое соединение и выводит в topi c. Во время тестирования мы обнаружили, что это работает должным образом, когда наш вывод topi c имеет только 1 раздел. Когда мы увеличиваем количество разделов, мы замечаем, что количество сообщений, которые выводятся на вывод topi c, уменьшается.
Мы проверили эту теорию с несколькими конфигурациями разделов перед запуском приложения. С 1 разделом мы видим 100% сообщений. С 2 мы видим некоторые сообщения (менее 50%). При 10 мы почти ничего не видим (менее 10%).
Поскольку нам осталось присоединиться, каждое отдельное сообщение, полученное от Topi c 1, должно быть записано в наш вывод topi c, но мы обнаруживаем, что этого не происходит. Кажется, что сообщения застревают в "промежуточных" темах, созданных из соединения внешнего ключа Ktables, но сообщений об ошибках нет.
Любая помощь будет принята с благодарностью!
Сервис. java
@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
return (topicOne, topicTwo) ->
topicOne
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}
build.gradle
plugins {
id 'org.springframework.boot' version '2.3.1.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}
...
ext {
set('springCloudVersion', "Hoxton.SR6")
}
...
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'
Примечание. Мы исключаем организацию apache .kafka зависимости из-за ошибки в версиях, включенных в spring-cloud-stream
application.yml
spring:
application:
name: app-name
stream:
bindings:
process-in-0:
destination: topic1
group: ${spring.application.name}
process-in-1:
destination: topic2
group: ${spring.application.name}
process-out-0:
destination: outputTopic
kafka:
streams:
binder:
applicationId: ${spring.application.name}
brokers: ${KAFKA_BROKERS}
configuration:
commit.interval.ms: 1000
producer:
acks: all
retries: 20
default:
key:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
min-partition-count: 2
Сценарий тестирования:
В качестве конкретного примера, если я опубликую sh следующие 3 сообщения в Topi c 1:
{"fieldA": 1, "fieldB": 1},,{"fieldA": 1, "fieldB": 1}
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
{"fieldA": 4, "fieldB": 4},,{"fieldA": 4, "fieldB": 4}
Выходной topi c получит только 2 сообщения.
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
Что случилось с двумя другими? Кажется, что некоторые пары ключ / значение просто не могут быть записаны в выходной файл topi c. Повторная попытка этих "потерянных" сообщений тоже не работает.
Обновление:
Мне удалось заставить это функционировать должным образом, используя Topi c 1 как KStream вместо KTable и позвонив по номеру toTable()
перед тем, как продолжить соединение KTable-KTable. Я до сих пор не понимаю, почему мое исходное решение не работает, но, надеюсь, этот обходной путь может пролить свет на реальную проблему.
@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
return (topicOne, topicTwo) ->
topicOne
.map(...)
.toTable()
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}