См. документацию .
Вы также можете настроить слушателей POJO с явными темами и разделами (и, по желанию, с их начальными смещениями):
@KafkaListener(id = "bar", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
Один слушатель может прослушивать несколько тем, но если у вас разные объемы сообщений, я бы порекомендовал отдельного слушателя для каждой темы;в противном случае тема с низким объемом может не получить желаемой активности.
РЕДАКТИРОВАТЬ
Вы можете использовать выражение SpEL для создания массива разделов.
Например;два слушателя, один получает нечетные разделы, а другой получает четные, может быть настроен следующим образом ...
@SpringBootApplication
public class So53588657Application {
public static void main(String[] args) {
SpringApplication.run(So53588657Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("so53588657", 50, (short) 1);
}
@KafkaListener(id = "odd", topicPartitions =
@TopicPartition(topic = "so53588657",
partitions = "#{T(com.example.So53588657Application$SplitParts).odds(50)}"))
public void oddParts(String in) {
// ...
}
@KafkaListener(id = "even", topicPartitions =
@TopicPartition(topic = "so53588657",
partitions = "#{T(com.example.So53588657Application$SplitParts).evens(50)}"))
public void evenParts(String in) {
// ...
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
return args -> registry.getListenerContainers()
.forEach(c -> c.getAssignedPartitions().forEach(ap -> System.out.println(ap)));
}
public static class SplitParts {
public static String[] odds(int partitions) {
return split(partitions, i -> i % 2 == 0);
}
public static String[] evens(int partitions) {
return split(partitions, i -> i % 2 == 1);
}
private static String[] split(int partitions, IntPredicate predicate) {
return IntStream.range(0, partitions)
.filter(predicate)
.mapToObj(i -> String.valueOf(i))
.collect(Collectors.toList())
.toArray(new String[0]);
}
}
}
Или вы можете предоставить их в виде списка через запятую в свойстве и использовать
partitions = { "#{'${partition.list}'.split(',')}" })