Есть ли примеры проектов, показывающих, как использовать Kafka с Micronaut? У меня проблемы с тем, чтобы заставить его работать.
У меня есть следующий производитель:
@KafkaClient
interface AppClient {
@Topic("topic-name")
void sendMessage(@KafkaKey String id, Event event)
}
и слушатель:
@KafkaListener(
groupId="group-id",
offsetReset = OffsetReset.EARLIEST
)
class AppListener {
@Topic("topic-name")
void onMessage(Event event) {
// do stuff
}
}
Мой application.yml содержит:
kafka:
bootstrap:
servers: localhost:2181
и application-test.yml (это правильно и должно ли оно находиться в том же каталоге, что и application.yml?. Также не знаете, как использовать встроенный сервер):
kafka:
# embedded:
# enabled: true
# topics: promo-api-promotions
bootstrap:
servers: localhost:9092
Мой тест выглядит так:
@MicronautTest
class AppSpec extends Specification {
@Shared
@AutoCleanup
EmbeddedServer server = ApplicationContext.run(EmbeddedServer)
@Shared
private AppClient appClient =
server.applicationContext.getBean(AppClient)
def 'The upload endpoint is called'() {
// test here
appClient.sendMessage(id, event)
// other test stuff
}
Основные проблемы, с которыми я сталкиваюсь:
Мой потребитель не потребляет из моей темы. Я вижу, что производитель создает тему в Kafka, и создается группа клиентов, но смещение остается равным 0.
У меня возникают проблемы при запуске теста, когда он выглядит так, как будто два экземпляра клиента созданы, и, следовательно, регистрация MBean не выполняется (также, если я пытаюсь использовать встроенный Kafka, я получаю другое сообщение о том, что порт 9092 уже используется, поскольку он пытается запустить сервер дважды):
javax.management.InstanceAlreadyExistsException:
kafka.consumer: тип = приложение-инфо, идентификатор = приложение-Кафка-клиент-приложение-приемник
на com.sun.jmx.mbeanserver.Repository.addMBean (Repository.java:437)
в com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository (DefaultMBeanServerInterceptor.java:1898)