Потребитель Kafka не может получить некоторые сообщения - PullRequest
0 голосов
/ 01 ноября 2018

Недавно у меня возникли некоторые проблемы при использовании пользовательских AIP-файлов kafka, и есть мой код:

public class ConsumerClientDemo {

    private static final String KAFKA_SERVERS = "17.162.110.1:9292,17.162.112.1:9293,17.162.114.1:9294";
    private static final String GROUP = "group-admin-test";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "mbGW4rH5");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                PlainLoginModule.class.getName() + " required username=\"%s\" " + "password=\"%s\";",
                "admin",
                "admin"
        ));
        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("mbGW4rH5"));
        final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            isShuttingDown.set(true);
            synchronized (consumer) {
                consumer.close();
            }
        }));
        try {
            while (!isShuttingDown.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("topic=%s, partition=%s, offset = %d, key = %s, value = %s%n",
                                record.topic(), record.partition(),record.offset(), record.key(), deviceData.value());
                    }
                }
        } catch (Exception e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

Сначала проблем не было, но при успешном получении 15 сообщений консоль выдает:

10:42:40:446 INFO [FetchSessionHandler] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Node 2 sent an invalid full fetch response with extra=(mbGW4rH5-0, response=(
10:43:10:499 INFO [FetchSessionHandler] [Consumer clientId=mbGW4rH5, groupId=group-admin-test] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms..

и затем клиент был приостановлен с тех пор, он не мог получать никаких сообщений, поэтому я включил отладку журнала. И есть журналы:

10:58:11:200 INFO [FetchSessionHandler] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms..
10:58:11:200 DEBUG [Fetcher] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Added READ_UNCOMMITTED fetch request for partition mbGW4rH5-0 at offset 15 to node 17.162.114.1:9294 (id: 2 rack: null)
10:58:11:200 DEBUG [FetchSessionHandler$Builder] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 2 with 1 partition(s).
10:58:11:200 DEBUG [Fetcher] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Sending READ_UNCOMMITTED FullFetchRequest(mbGW4rH5-0) to broker 17.162.114.1:9294 (id: 2 rack: null)
10:58:13:161 DEBUG [AbstractCoordinator] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Sending Heartbeat request to coordinator cloud-access.hanclouds.com:9292 (id: 2147483647 rack: null)
10:58:13:207 DEBUG [AbstractCoordinator$HeartbeatResponseHandler] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Received successful Heartbeat response
10:58:15:113 DEBUG [ConsumerCoordinator] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Sending asynchronous auto-commit of offsets {mbGW4rH5-0=OffsetAndMetadata{offset=15, metadata=''}}
10:58:15:159 DEBUG [ConsumerCoordinator$OffsetCommitResponseHandler] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Committed offset 15 for partition mbGW4rH5-0
10:58:15:159 DEBUG [ConsumerCoordinator$4] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Completed asynchronous auto-commit of offsets {mbGW4rH5-0=OffsetAndMetadata{offset=15, metadata=''}}
10:58:16:162 DEBUG [AbstractCoordinator] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Sending Heartbeat request to coordinator 17.162.110.1:9292 (id: 2147483647 rack: null)
10:58:16:217 DEBUG [AbstractCoordinator$HeartbeatResponseHandler] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Received successful Heartbeat response
10:58:19:162 DEBUG [AbstractCoordinator] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Sending Heartbeat request to coordinator 17.162.110.1:9292 (id: 2147483647 rack: null)
10:58:19:211 DEBUG [AbstractCoordinator$HeartbeatResponseHandler] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Received successful Heartbeat response
10:58:20:114 DEBUG [ConsumerCoordinator] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Sending asynchronous auto-commit of offsets {mbGW4rH5-0=OffsetAndMetadata{offset=15, metadata=''}}
10:58:20:165 DEBUG [ConsumerCoordinator$OffsetCommitResponseHandler] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Committed offset 15 for partition mbGW4rH5-0
10:58:20:165 DEBUG [ConsumerCoordinator$4] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Completed asynchronous auto-commit of offsets {mbGW4rH5-0=OffsetAndMetadata{offset=15, metadata=''}}
10:58:22:163 DEBUG [AbstractCoordinator] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Sending Heartbeat request to coordinator 17.162.110.1:9292 (id: 2147483647 rack: null)
10:58:22:226 DEBUG [AbstractCoordinator$HeartbeatResponseHandler] [Consumer clientId=mbGW4rH5, groupId=group-mbGW4rH5] Received successful Heartbeat response

Казалось, что клиент не может получить смещение 15, поэтому я изменил группу и установил смещение на самое последнее, и оно снова заработало. Итак, я хочу спросить, почему смещение 15 не может быть получено ? и как перепрыгнуть через смещение, которое не может быть получено, чтобы избежать зависания клиента? Кстати, версия Kafka была 2.0.0, как и клиенты kafka.

Спасибо.

...