Просто создайте новый KafkaMessageListenerContainer
каждый раз и запускайте / останавливайте его при необходимости.
Вы можете использовать автоматически сконфигурированную Boot ConcurrentKafkaListenerContainerFactory
для создания контейнеров. Просто установите свойство контейнера groupId
, чтобы сделать их уникальными.
РЕДАКТИРОВАТЬ
Вот пример:
@SpringBootApplication
public class So60150686Application {
public static void main(String[] args) {
SpringApplication.run(So60150686Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so60150686", "foo");
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so60150686").partitions(1).replicas(1).build();
}
}
@RestController
class Web {
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
public Web(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
this.factory = factory;
}
@GetMapping(path="/foo/{group}")
public String foo(@PathVariable String group) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so60150686");
container.getContainerProperties().setGroupId(group);
container.getContainerProperties().setMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(record);
}
});
container.start();
return "ok";
}
}
spring.kafka.consumer.auto-offset-reset=earliest
$ http localhost:8080/foo/bar
HTTP/1.1 200
Connection: keep-alive
Content-Length: 2
Content-Type: text/plain;charset=UTF-8
Date: Mon, 10 Feb 2020 19:42:02 GMT
Keep-Alive: timeout=60
ok
2020-02-10 14: 42: 09.744 INFO 34096 --- [consumer-0- C -1] osklKafkaMessageListenerContainer: bar: назначенные разделы: [so60150686-0]
ConsumerRecord ( topi c = so60150686, раздел = 0, leaderEpoch = 0, смещение = 1, CreateTime = 1581363648938, размер сериализованного ключа = -1, размер сериализованного значения = 3, заголовки = RecordHeaders (headers = [], isReadOnly = false), ключ = ноль, значение = foo)