Производитель не может рассылать сообщения нескольким потребителям? - PullRequest
0 голосов
/ 10 июля 2020

Я пытаюсь автоматически масштабировать java потребителя kafka spring framework с docker -compose с флагом --scale и портами пересылки в docker -compose.yaml, например «8070-8072: 8070». Поэтому, когда я запускаю конечную точку для публикации сообщений, это хорошо. Но со стороны потребителя все сообщения потребляет только 1 потребитель. У меня есть 3 потребителя с одинаковым идентификатором группы и другим идентификатором клиента. Я хочу распределенный обмен сообщениями. Я прочитал статью о разделении и просмотрел свои журналы. Вроде всего 1 раздел. Это причина? Как я могу этого добиться? Я добавлю журнал, конфигурацию потребителя, конфигурацию издателя и файл docker -compose. Во-первых, журнал . Кажется, только 3 из 1 имеют раздел.

команда:

docker-compose up --build --scale nonwebapp=3 --scale webapp=3

docker -compose.yaml

kafka:
        image: confluentinc/cp-kafka:latest
        depends_on:
            - zookeeper
        ports:
            - 9092:9092
        environment:
            KAFKA_LOG_DIRS: "/kafka/kafka-logs"
            KAFKA_BROKER_ID: 1
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
            KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
            KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
        volumes:
            - /var/run/docker.sock:/var/run/docker.sock
        restart: always
            
    webapp:
        build: benchmark.web
        command: bash -c "while !</dev/tcp/kafka/29092; do >&2 echo 'Waiting for kafka to up...' sleep 1; done;"
        ports: 
            - "8070-8072:8070"
        volumes:
            - ./logs:/logs
            - ${GCP_KEY_PATH}:/root/keys/keyfile.json:ro
        depends_on:
          - kafka
        environment:
          SPRING_KAFKA_BOOTSTRAPSERVERS: kafka:29092
          TZ: "Asia/Istanbul"
          GOOGLE_APPLICATION_PROJECT_ID: apt-cubist-282712
          GOOGLE_APPLICATION_CREDENTIALS: /root/keys/keyfile.json
        restart: always
            
            
    nonwebapp:
        build: benchmark.nonweb
        command: bash -c "while !</dev/tcp/kafka/29092; do >&2 echo 'Waiting for kafka to up...' sleep 1; done;"
        depends_on:
          - kafka
        volumes:
          - ${GCP_KEY_PATH}:/root/keys/keyfile.json:ro
        ports:
          - "8060-8062:8060"
        environment:
          SPRING_KAFKA_BOOTSTRAPSERVERS: kafka:29092
          GOOGLE_APPLICATION_PROJECT_ID: apt-cubist-282712
          GOOGLE_APPLICATION_CREDENTIALS: /root/keys/keyfile.json
          TZ: "Asia/Istanbul"
        restart: always

конфигурация производителя

@Bean
    ProducerFactory<String, byte[]> producerFactory(){
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                kafkaInContainerAdress);

    /*
        configProps.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                kafkaInLocal);
*/
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class
        );
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                ByteArraySerializer.class
        );
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String,byte[]> kafkaTemplate(){

        return new KafkaTemplate<>(producerFactory());
    }

конфигурация потребителя

 @Bean
    public ConsumerFactory<String, byte[]> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
       props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                kafkaInContainerAdress);
       /*
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                kafkaInLocal);
*/
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                "org.apache.kafka.clients.consumer.RoundRobinAssignor");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, r.nextInt());
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                ByteArrayDeserializer.class);
        props.put("group.id","topic_trial_consumers");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, byte[]>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

1 Ответ

2 голосов
/ 10 июля 2020

Кажется, там всего 1 раздел. Это причина?

Да, если есть только один раздел, только один потребитель (из группы потребителей) может использовать его, а другие потребители (из той же группы) будут бездействовать, даже если они

Кажется, только 3 из 1 имеют раздел

Из вашего изображения я вижу, что файл topic_trial-0. Итак, это 1-й раздел topic_trial.

Увеличьте номер. разделов, например, до 3 и запускать трех потребителей с одинаковыми group.id, и нагрузка должна распределяться (по одному разделу для каждого).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...