API клиента Kafka Producer не может выполнять асинхронную отправку, когда целевой брокер использует SASL / PLAIN - PullRequest
0 голосов
/ 17 сентября 2018

У меня была простая демонстрация для передачи данных из одного кластера kafka, который не использовал SASL, в другой кластер kafka, который использовал SASL / PLAIN. и коды выглядят так:

 Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.50.20:9092");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);


        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.100:9092");
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        producerProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        producerProps.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                PlainLoginModule.class.getName() + " required username=\"%s\" " + "password=\"%s\";",
                "admin",
                "admin"
        ));
       //and some other producer properties

        KafkaProducer producer = new KafkaProducer<>(props);
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    ProducerRecord<String, String> record = new ProducerRecord<>("test", record.key()
                            ,record.value());
                    producer.send(record);
                }
            }

Просто потреблял данные и передавал их в другой кластер kafka, но дело в следующем: когда я написал другой потребительский клиент для потребления этих данных из 192.168.1.100:9092

    Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.100:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        String password = EncryptUtil.encryptPassword(USER_NAME, QUERY_KEY, QUERY_SECRET);
        System.out.println(password);
        props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                PlainLoginModule.class.getName() + " required username=\"%s\" " + "password=\"%s\";",
                "admin",
                "admin"
        ));
        while (true) {
                ConsumerRecords<String, String> records = 
                consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                   System.out.println(record.value());
                }
            }

и печатать было нечего, кроме

    9419 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Discovered group coordinator 192.168.1.100:9092 (id: 2147483647 rack: null)
18456 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Revoking previously assigned partitions []
18456 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] (Re-)joining group
18471 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Successfully joined group with generation 21
18471 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Setting newly assigned partitions [flink-kafka-0]
27522 [main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Resetting offset for partition flink-kafka-0 to offset 0.
27537 [main] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Node 0 sent an invalid full fetch response with extra=(flink-kafka-0, response=(
57556 [main] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms..
87563 [main] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms..
117585 [main] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms..
146158 [main] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Node 0 sent an invalid full fetch response with extra=(flink-kafka-0, response=(
176263 [main] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms..
206333 [main] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms..
236418 [main] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms..
266492 [kafka-coordinator-heartbeat-thread | group-snKiBQ0O] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms..
296558 [main] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms..
317372 [kafka-coordinator-heartbeat-thread | group-snKiBQ0O] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Group coordinator 192.168.1.100:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
326571 [main] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=snKiBQ0O, groupId=group-snKiBQ0O] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms..

и затем я использовал эту оболочку ./bin/kafka-consumer-groups.sh --describe --bootstrap-server 192.168.1.100:9092 --command-config config/client_plain.properties --group group-snKiBQ0O для проверки правильности отправки данных, и LOG-END-OFFSET имеет значение 5912, но CURRENT-OFFSET всегда было 0.

Наконец, я изменил producer.send(record); на producer.send(record).get();, и потребительский клиент успешно получил данные. это почему? Почему брокер, использующий SASL / PLAIN, не может асинхронно отправлять данные? Есть ли хороший способ с этим справиться?

Спасибо.

ОБНОВЛЕНИЕ: Я удалил все данные журнала kafka и zookeeper, и они могут нормально работать. Но до сих пор не понимаю, почему это произошло

...