Невозможно создать сообщения потребления - PullRequest
1 голос
/ 07 октября 2019

Я новичок в весенне-интеграционной кафке. Я настраиваю Кафку на Docker, но не могу создавать и потреблять сообщения. Вот мой файл составления докера. Любая помощь будет очень признательна. Спасибо

zookeeper:
        image: wurstmeister/zookeeper
        ports:
         - "2181:2181"

kafka:
        image: wurstmeister/kafka
        ports:
           - "9092:9092"
           - "9999:9999"
        environment:
           - KAFKA_ADVERTISED_HOST_NAME=20.0.201.75
           - KAFKA_LISTENERS=INSIDE://:9090,OUTSIDE://:9092
           - KAFKA_ADVERTISED_LISTENERS=INSIDE://:9090,OUTSIDE://20.0.201.75:9092
           - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
           - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
           - JMX_PORT=9999
           - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
           - KAFKA_JMX_OPTS= -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka -Dcom.sun.management.jmxremote.rmi.port=9999
        volumes:
           - /var/run/docker.sock:/var/run/docker.sock

Я отправляю сообщения на 20.0.201.75:9092 и потребляю сообщения с 20.0.201.75:9092 и 20.0.201.75, здесь указывается dockerhostip. Код стороны производителя

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


    public String sendData() {
        System.out.println("hewe");
        for(int i=0;i<10;i++) {
        ListenableFuture<SendResult<String, String>> s = kafkaTemplate.send("dms", Integer.valueOf(0),String.valueOf(i+230), "added message in dms");
            try {
                s.get();
            } catch (InterruptedException | ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        System.out.println("sent successfully");

        return "";

    }

Код потребительской стороны:

    @Bean("kafkaListenerContainerFactory")
    public KafkaMessageListenerContainer<String, String> container() throws Exception {
        ContainerProperties properties = new ContainerProperties("dms");
        KafkaMessageListenerContainer<String,String> kmlc = new KafkaMessageListenerContainer<String,String>(consumerFactory(), properties);
        // set more properties
        return kmlc;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "20.0.201.75:9092");
        // set more properties
        return new DefaultKafkaConsumerFactory<>(props);
    }


...