весенняя кафка - многократное потребительское чтение из одной темы - PullRequest
0 голосов
/ 10 февраля 2020

Я создаю веб-приложение с использованием весенней загрузки, и теперь у меня есть требование получать уведомления в режиме реального времени. Я планирую использовать apache kafka в качестве брокера сообщений для этого. Требование таково, что есть пользователи с разными ролями и в зависимости от роли они должны получать уведомления о том, что делают другие пользователи. Я настроил одного производителя и потребителя, и, как потребитель, я мог получать информацию, опубликованную в topi c, скажем, theme1. Я застрял в том, что у меня может быть несколько пользователей, слушающих одну и ту же топи c, и каждый пользователь должен опубликовать сообщение для этой топи c. Я понимаю, что для этого требования нам нужно установить разные group.id для каждого kafkalistener, чтобы каждый потребитель мог получить сообщение.
Но как я собираюсь создать kafkalistener с другим идентификатором группы, когда пользователь вошел в систему? Надеюсь, кто-то может дать некоторые рекомендации по этому поводу? Спасибо

1 Ответ

1 голос
/ 10 февраля 2020

Просто создайте новый KafkaMessageListenerContainer каждый раз и запускайте / останавливайте его при необходимости.

Вы можете использовать автоматически сконфигурированную Boot ConcurrentKafkaListenerContainerFactory для создания контейнеров. Просто установите свойство контейнера groupId, чтобы сделать их уникальными.

РЕДАКТИРОВАТЬ

Вот пример:

@SpringBootApplication
public class So60150686Application {

    public static void main(String[] args) {
        SpringApplication.run(So60150686Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so60150686", "foo");
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so60150686").partitions(1).replicas(1).build();
    }

}

@RestController
class Web {

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    public Web(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
        this.factory = factory;
    }

    @GetMapping(path="/foo/{group}")
    public String foo(@PathVariable String group) {
        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so60150686");
        container.getContainerProperties().setGroupId(group);
        container.getContainerProperties().setMessageListener(new MessageListener<String, String>() {

            @Override
            public void onMessage(ConsumerRecord<String, String> record) {
                System.out.println(record);
            }

        });
        container.start();
        return "ok";
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
$ http localhost:8080/foo/bar
HTTP/1.1 200 
Connection: keep-alive
Content-Length: 2
Content-Type: text/plain;charset=UTF-8
Date: Mon, 10 Feb 2020 19:42:02 GMT
Keep-Alive: timeout=60

ok

2020-02-10 14: 42: 09.744 INFO 34096 --- [consumer-0- C -1] osklKafkaMessageListenerContainer: bar: назначенные разделы: [so60150686-0]

ConsumerRecord ( topi c = so60150686, раздел = 0, leaderEpoch = 0, смещение = 1, CreateTime = 1581363648938, размер сериализованного ключа = -1, размер сериализованного значения = 3, заголовки = RecordHeaders (headers = [], isReadOnly = false), ключ = ноль, значение = foo)

...