Проверка поведения кафки при отключении одного брокера (spring-kafka) - PullRequest
1 голос
/ 01 апреля 2020

Я начинаю в Кафке.

У меня есть один кластер с двумя брокерами (идентификаторы № 2 и № 3) и коэффициентом репликации 2. Я хочу проверить поведение Кафки, если я отключаюсь один брокер (id # 3).

После закрытия # 3, мой topi c информация:

Topic: CUSTOMER PartitionCount: 5       ReplicationFactor: 2    Configs:
        Topic: CUSTOMER Partition: 0    Leader: 2       Replicas: 3,2   Isr: 2
        Topic: CUSTOMER Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2
        Topic: CUSTOMER Partition: 2    Leader: 2       Replicas: 3,2   Isr: 2
        Topic: CUSTOMER Partition: 3    Leader: 2       Replicas: 2,3   Isr: 2
        Topic: CUSTOMER Partition: 4    Leader: 2       Replicas: 3,2   Isr: 2

Каждый раздел реплицируется на всех брокеров и теперь # 2 Лидером является брокер, это нормально.

Публикация сообщения в порядке, но она не используется моей службой поддержки пользователей (я использую Spring-kafka).

В момент отключения , журналы потребителей:

2020-04-01 14:51:42.736  INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:677        [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Discovered group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null)
2020-04-01 14:51:42.737  INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:729        [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
2020-04-01 14:51:42.840  INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:677        [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Discovered group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null)
2020-04-01 14:51:42.841  WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] org.apache.kafka.clients.NetworkClient:671         [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Connection to node 2147483644 could not be established. Broker may not be available.
2020-04-01 14:51:42.841  INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator:729        [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Group coordinator 10.0.0.0:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
2020-04-01 14:51:42.842  WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] org.apache.kafka.clients.NetworkClient:671         [][][] : [Consumer clientId=consumer-6, groupId=NOTIF] Connection to node 3 could not be established. Broker may not be available.
2020-04-01 14:51:43.136  WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] org.apache.kafka.clients.NetworkClient:671         [][][] : [Consumer clientId=consumer-5, groupId=NOTIF] Connection to node 3 could not be established. Broker may not be available.
2020-04-01 14:51:43.184  WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] org.apache.kafka.clients.NetworkClient:671         [][][] : [Consumer clientId=consumer-3, groupId=NOTIF] Connection to node 3 could not be established. Broker may not be available.

Тогда ничего. И ничего в журналах зоопарка.

Когда я запускаю своего брокера, все сообщения теперь потребляются.

Можете ли вы сказать мне, если я ошибаюсь? В моей конфигурации topi c я предполагаю, что отключение одного брокера должно быть возможным без последствий.

Моя конфигурация kafka:

broker.id=2 (not the same value on the other broker)
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/share/kafka/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=serv2:2181,serv3:2181,serv5:2181
zookeeper.connection.timeout.ms=6000
default.replication.factor=1
offsets.topic.replication.factor=1

И моя конфигурация zookeeper:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/share/zookeeper/data
server.2=serv2:2888:3888;2181
server.3=serv3:2888:3888;2181
server.5=serv5:2888:3888;2181

Я создаю топи c с помощью Spring Kafka:

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
    @Bean
    public NewTopic notifTopic() {
        return new NewTopic(notifTopic, partitions, (short) bootstrapAddress.split(",").length);
    }

И для потребителя: Конфиг:

@EnableKafka
@Configuration
@Profile({ "!mockKafka & !test" })
public class KafkaConfiguration implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;

    @Value(value = "${kafka.servers}")
    private String bootstrapAddress;

    @Value(value = "${kafka.groups.notif.name}")
    private String notifGroup;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> containerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, Object>(getConsumerFactoryProperties()));
        factory.setConcurrency(5);
        return factory;
    }

    private Map<String, Object> getConsumerFactoryProperties() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, notifGroup);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return props;
    }

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setValidator(validator);
    }

}

Слушатель:

@Service
@Slf4j
@Profile({"!mockKafka & !test"})
@Transactional
@KafkaListener(containerFactory = "containerFactory", topics = { "${kafka.topics.notif.name}" })
public class NotificationListener { 

    @KafkaHandler
    public void email(@Payload @Valid EmailNotification record, @Header(ContextUtils.HEADER_ACCOUNT) String account,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) long partition) {
        log.info("Consuming message [EMAIL] from topic [{}], partition [{}], offset [{}]", topic, partition, offset);
        ...
    }

и мои глобальные конфигурации:

kafka:
    servers: serv2:9092,serv3:9092
    publish.timeout: 3000
    partitions: 5
    topics:
        customer:
            name: CUSTOMER
        notif:
            name: NOTIF
        health:
            name: HEALTH
    groups:
        customer:
            name: CUSTOMER
        notif:
            name: NOTIF

Используемые версии: Kafka: 2.4.0 Zookeeper: 3.5.6 Spring-kafka: 2.2.12

Спасибо.

1 Ответ

1 голос
/ 01 апреля 2020

Я думаю, что это

offsets.topic.replication.factor=1

Поскольку смещения не реплицируются, потребитель не может найти свою позицию.

Согласно документам, по умолчанию установлено значение 3 (если не указано); но он переопределяется на 1 (по крайней мере, в моем доморощенном дистите).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...