Использование Кафки с Микронавтом - PullRequest
0 голосов
/ 02 ноября 2018

Есть ли примеры проектов, показывающих, как использовать Kafka с Micronaut? У меня проблемы с тем, чтобы заставить его работать.

У меня есть следующий производитель:

@KafkaClient
interface AppClient {

@Topic("topic-name")
    void sendMessage(@KafkaKey String id, Event event)
}

и слушатель:

@KafkaListener(
    groupId="group-id",
    offsetReset = OffsetReset.EARLIEST
)
class AppListener {

@Topic("topic-name")
    void onMessage(Event event) {
        // do stuff
    }
}

Мой application.yml содержит:

kafka:
  bootstrap:
    servers: localhost:2181

и application-test.yml (это правильно и должно ли оно находиться в том же каталоге, что и application.yml?. Также не знаете, как использовать встроенный сервер):

kafka:
  #  embedded:
  #    enabled: true
  #    topics: promo-api-promotions
  bootstrap:
    servers: localhost:9092

Мой тест выглядит так:

@MicronautTest
class AppSpec extends Specification {

@Shared
@AutoCleanup
EmbeddedServer server = ApplicationContext.run(EmbeddedServer)

@Shared
private AppClient appClient =
        server.applicationContext.getBean(AppClient)

def 'The upload endpoint is called'() {
  // test here
  appClient.sendMessage(id, event)
  // other test stuff
}

Основные проблемы, с которыми я сталкиваюсь:

  1. Мой потребитель не потребляет из моей темы. Я вижу, что производитель создает тему в Kafka, и создается группа клиентов, но смещение остается равным 0.

  2. У меня возникают проблемы при запуске теста, когда он выглядит так, как будто два экземпляра клиента созданы, и, следовательно, регистрация MBean не выполняется (также, если я пытаюсь использовать встроенный Kafka, я получаю другое сообщение о том, что порт 9092 уже используется, поскольку он пытается запустить сервер дважды):

    javax.management.InstanceAlreadyExistsException: kafka.consumer: тип = приложение-инфо, идентификатор = приложение-Кафка-клиент-приложение-приемник на com.sun.jmx.mbeanserver.Repository.addMBean (Repository.java:437) в com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository (DefaultMBeanServerInterceptor.java:1898)

Ответы [ 2 ]

0 голосов
/ 04 ноября 2018

Вы должны добавить встроенную конфигурацию kafka.embedded.enabled на карту с конфигурацией и передать ее в метод ApplicationContext.run.

Map<String, Object> config = Collections.
    unmodifiableMap(new HashMap<String, Object>() {
        {
            put(AbstractKafkaConfiguration.EMBEDDED, true);
            put(AbstractKafkaConfiguration.EMBEDDED_TOPICS, "test_topic");
    }
});

try (ApplicationContext ctx = ApplicationContext.run(config)) {

Потребитель потребляет от Кафки в другом потоке, и вам придется подождать некоторое время, пока ваш AppListener не догонит. Вы можете увидеть короткий пример в KafkaProducerListenerTest

Запомните зависимости Кафки, описанные в документе Micronaut: Встраивание Кафки

0 голосов
/ 02 ноября 2018

Удалось решить вторую проблему - у объекта, переданного в слушатель, не было @JsonCreator. Я выяснил это, пытаясь использовать сопоставитель объектов Джексона для создания объекта из его JSON во время игры.

Если у кого-то еще есть такая же проблема - убедитесь, что объектная модель работает с Джексоном, прежде чем идти дальше!

...