Можно ли иметь одну потребительскую ветку Kafka на тему? - PullRequest
2 голосов
/ 22 мая 2019

У нас есть приложение Springboot, которое использует Spring-Kafka (2.1.7). Мы включили параллелизм, поэтому у нас может быть один потребительский поток на раздел. Так что в настоящее время, если у нас есть 3 темы, каждая с 2 разделами, будет 2 пользовательских потока, как показано ниже:

ConsumerThread1 - [topic1-0, topic2-0, topic3-0]
ConsumerThread2 - [тема1-1, тема2-1, тема3-1]

Однако вместо одного KafkaListener (или потребительского потока) на раздел мы хотели бы иметь один потребительский поток на тему . Например:

ConsumerThread1 - [topic1-0, topic1-1]
ConsumerThread2 - [topic2-0, topic2-1]
ConsumerThread3 - [topic3-0, topic3-1]

Если это невозможно, подойдет даже следующая настройка:

ConsumerThread1 - [topic1-0]
ConsumerThread2 - [topic1-1]
ConsumerThread3 - [topic2-0]
ConsumerThread4 - [topic2-1]
ConsumerThread5 - [topic3-0]
ConsumerThread6 - [topic3-1]

Загвоздка в том, что мы не знаем полный список тем до раздачи (мы используем шаблон темы подстановочных знаков). Новая тема может быть добавлена ​​в любое время, и новая динамическая тема (или темы) должна динамически создаваться для этой новой темы во время выполнения.

Есть ли способ, которым это может быть достигнуто?

Ответы [ 3 ]

3 голосов
/ 23 мая 2019

Вы можете создать отдельные контейнеры для каждой темы из spring-kafka: 2.2 и установить параллелизм 1, так что каждый контейнер будет потреблять из каждой темы

Начиная с версии 2.2Вы можете использовать ту же фабрику для создания любого ConcurrentMessageListenerContainer.Это может быть полезно, если вы хотите создать несколько контейнеров со схожими свойствами, или вы хотите использовать какую-то внешне настроенную фабрику, такую ​​как та, которая предоставляется автоконфигурацией Spring Boot.После создания контейнера вы можете дополнительно изменить его свойства, многие из которых устанавливаются с помощью container.getContainerProperties ().В следующем примере настраивается ConcurrentMessageListenerContainer:

@Bean
public ConcurrentMessageListenerContainer<String, String>(
    ConcurrentKafkaListenerContainerFactory<String, String> factory) {

ConcurrentMessageListenerContainer<String, String> container =
    factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}

Примечание: Контейнеры, созданные таким образом, не добавляются в реестр конечных точек.Они должны быть созданы как определения @Bean, чтобы они были зарегистрированы в контексте приложения.

1 голос
/ 23 мая 2019

Вы можете использовать пользовательский Partitioner для распределения разделов по своему усмотрению.Это свойство потребителя кафки.

РЕДАКТИРОВАТЬ

См. этот ответ .

Это для @JmsListener ноту же технику можно применить и к кафке.

0 голосов
/ 07 июня 2019

Благодаря предложениям @ Гэри Рассела я смог найти следующее решение, которое создает экземпляр компонента (или поток потребителя) @KafkaListener для каждой темы Кафки. Таким образом, если существует проблема с сообщениями, относящимися к определенной теме, это не повлияет на обработку других тем.

Примечание - следующий код выдает исключение InstanceAlreadyExistsException при запуске. Однако это, похоже, не влияет на функциональность. Используя выходные данные журнала, я могу убедиться, что для каждой темы существует один экземпляр (или поток) компонента, и они могут обрабатывать сообщения.

@SpringBootApplication
@EnableScheduling
@Slf4j
public class KafkaConsumerApp {

    public static void main(String[] args) {
        log.info("Starting spring boot KafkaConsumerApp..");
        SpringApplication.run(KafkaConsumerApp.class, args);
    }

}


@EnableKafka
@Configuration
public class KafkaConfiguration {

    private final KafkaProperties kafkaProperties;

    @Value("${kafka.brokers:localhost:9092}")
    private String bootstrapServer;

    @Value("${kafka.consumerClientId}")
    private String consumerClientId;

    @Value("${kafka.consumerGroupId}")
    private String consumerGroupId;

    @Value("${kafka.topicMonitorClientId}")
    private String topicMonitorClientId;

    @Value("${kafka.topicMonitorGroupId}")
    private String topicMonitorGroupId;

    @Autowired
    private ConfigurableApplicationContext context;

    @Autowired
    public KafkaConfiguration( KafkaProperties kafkaProperties ) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory( consumerFactory( consumerClientId, consumerGroupId ) );
        factory.getContainerProperties().setAckMode( ContainerProperties.AckMode.MANUAL );
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> topicMonitorContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory( consumerFactory( topicMonitorClientId, topicMonitorGroupId ) );
        factory.getContainerProperties().setAckMode( ContainerProperties.AckMode.MANUAL );
        factory.getContainerProperties().setConsumerRebalanceListener( new KafkaRebalanceListener( context ) );
        return factory;
    }

    private ConsumerFactory<String, String> consumerFactory( String clientId, String groupId ) {
        Map<String, Object> config = new HashMap<>();
        config.putAll( kafkaProperties.buildConsumerProperties() );
        config.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer );
        config.put( ConsumerConfig.CLIENT_ID_CONFIG, clientId );
        config.put( ConsumerConfig.GROUP_ID_CONFIG, groupId );
        config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false ); // needs to be turned off for rebalancing during topic addition and deletion
                                                                    // check -> /8877715/mozhno-li-imet-odnu-potrebitelskuy-vetku-kafka-na-temucomment99401765_56274988
        return new DefaultKafkaConsumerFactory<>( config, new StringDeserializer(), new StringDeserializer() );
    }
}


@Configuration
public class KafkaListenerConfiguration {

    @Bean
    @Scope("prototype")
    public KafkaMessageListener kafkaMessageListener() {
        return new KafkaMessageListener();
    }

}


@Slf4j
public class KafkaMessageListener {

    /*
     * This is the actual message listener that will process messages. It will be instantiated per topic.
     */
    @KafkaListener( topics = "${topic}", containerFactory = "kafkaListenerContainerFactory" )
    public void receiveHyperscalerMessage( ConsumerRecord<String, String> record, Acknowledgment acknowledgment, Consumer<String, String> consumer ) {

        log.debug("Kafka message - ThreadName={}, Hashcode={}, Partition={}, Topic={}, Value={}", 
                Thread.currentThread().getName(), Thread.currentThread().hashCode(), record.partition(), record.topic(), record.value() );

        // do processing

        // this is just a sample acknowledgment. it can be optimized to acknowledge after processing a batch of messages. 
        acknowledgment.acknowledge();
    }

}


@Service
public class KafkaTopicMonitor {

    /*
     * The main purpose of this listener is to detect the rebalance events on our topic pattern, so that 
     * we can create a listener bean instance (consumer thread) per topic. 
     *
     * Note that we use the wildcard topic pattern here.
     */
    @KafkaListener( topicPattern = ".*abc.def.ghi", containerFactory = "topicMonitorContainerFactory" )
    public void monitorTopics( ConsumerRecord<String, String> record ) {
        // do nothing
    }

}


@Slf4j
public class KafkaRebalanceListener implements ConsumerAwareRebalanceListener {

    private static final ConcurrentMap<String, KafkaMessageListener> listenerMap = new ConcurrentHashMap<>();
    private final ConfigurableApplicationContext context;

    public KafkaRebalanceListener( ConfigurableApplicationContext context ) {
        this.context = context;
    }

    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // do nothing
    }

    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // do nothing
    }

    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {

        log.info("OnPartitionsAssigned - partitions={} - {}", partitions.size(), partitions);
        Properties props = new Properties();
        context.getEnvironment().getPropertySources().addLast( new PropertiesPropertySource("topics", props) );

        for( TopicPartition tp: partitions ) {

            listenerMap.computeIfAbsent( tp.topic(), key -> {
                log.info("Creating messageListener bean instance for topic - {}", key );
                props.put( "topic", key );
                // create new KafkaMessageListener bean instance
                return context.getBean( "kafkaMessageListener", KafkaMessageListener.class );
            });
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...