Spring-Kafka - Как я могу указать диапазон разделов для слушателей, используя аннотации? - PullRequest
0 голосов
/ 03 декабря 2018

У меня есть 3 темы, и каждая тема имеет 50 разделов.Я хочу использовать @KafkaListener для указания прослушивателей для разных разделов

  • Нужно ли иметь одного прослушивателя для каждой темы?

  • Нужно ли иметь несколько прослушивателейдля одной темы, если да, как я могу указать диапазон разделов для темы?

  • Кроме того, из 3 тем в 2 темах содержится гораздо больше данных, чем в третьей, поэтому я долженУ вас есть больше слушателей этой темы, чтобы она могла справиться с нагрузкой?

  • А как выбрать номер параллелизма?

Мой код пока:

    @Configuration
    @EnableKafka
public class ConsumerConfig {

  // Factory to create the consumer classes
  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
  }

  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> propsMap = new HashMap<>();
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return propsMap;
  }


  //
  @Bean
  KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }

  @Bean
  public Listener listener() {
    return new Listener();
  }
}

1 Ответ

0 голосов
/ 03 декабря 2018

См. документацию .

Вы также можете настроить слушателей POJO с явными темами и разделами (и, по желанию, с их начальными смещениями):

@KafkaListener(id = "bar", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

Один слушатель может прослушивать несколько тем, но если у вас разные объемы сообщений, я бы порекомендовал отдельного слушателя для каждой темы;в противном случае тема с низким объемом может не получить желаемой активности.

РЕДАКТИРОВАТЬ

Вы можете использовать выражение SpEL для создания массива разделов.

Например;два слушателя, один получает нечетные разделы, а другой получает четные, может быть настроен следующим образом ...

@SpringBootApplication
public class So53588657Application {

    public static void main(String[] args) {
        SpringApplication.run(So53588657Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so53588657", 50, (short) 1);
    }

    @KafkaListener(id = "odd", topicPartitions =
            @TopicPartition(topic = "so53588657",
                partitions = "#{T(com.example.So53588657Application$SplitParts).odds(50)}"))
    public void oddParts(String in) {
        // ...
    }

    @KafkaListener(id = "even", topicPartitions =
            @TopicPartition(topic = "so53588657",
                partitions = "#{T(com.example.So53588657Application$SplitParts).evens(50)}"))
    public void evenParts(String in) {
        // ...
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
        return args -> registry.getListenerContainers()
                .forEach(c -> c.getAssignedPartitions().forEach(ap -> System.out.println(ap)));
    }

    public static class SplitParts {

        public static String[] odds(int partitions) {
            return split(partitions, i -> i % 2 == 0);
        }

        public static String[] evens(int partitions) {
            return split(partitions, i -> i % 2 == 1);
        }

        private static String[] split(int partitions, IntPredicate predicate) {
            return IntStream.range(0, partitions)
                    .filter(predicate)
                    .mapToObj(i -> String.valueOf(i))
                    .collect(Collectors.toList())
                    .toArray(new String[0]);
        }

    }

}

Или вы можете предоставить их в виде списка через запятую в свойстве и использовать

partitions = { "#{'${partition.list}'.split(',')}" })
...