Возможно, вы отправляете сообщение до того, как получателю будет назначена тема / раздел. Установить свойство ...
spring:
kafka:
consumer:
auto-offset-reset: earliest
... по умолчанию latest
.
Это похоже на использование --from-beginning
с консольным потребителем.
EDIT
О; вы не используете свойства загрузки.
Добавить
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
EDIT2
Кстати, вам, вероятно, следует также сделать get(10L, TimeUnit.SECONDS)
в результате template.send()
(Future<>
), чтобы подтвердить, что отправка прошла успешно.
EDIT3
Чтобы переопределить сброс смещения только для теста, вы можете сделать то же самое, что вы сделали для адресов брокера:
@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;
...
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);
и
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest"})
Однако имейте в виду, что это свойство применяется только при первом использовании группы. Чтобы всегда запускаться в конце при каждом запуске приложения, вы должны искать его до конца во время запуска.
Кроме того, я бы рекомендовал установить enable.auto.commit
в false
, чтобы контейнер заботился о фиксации смещений, а не просто полагался на то, что клиент-потребитель делает это по расписанию.