Почему потоки kafka повторно обрабатывают сообщения, созданные после перезапуска брокера - PullRequest
0 голосов
/ 07 июня 2018

У меня есть один узел кафка брокер и простое приложение потоков.Я создал 2 темы (topic1 и topic2).

Produced on topic1 - processed message - write to topic2

Примечание. Для каждого созданного сообщения в целевую тему записывается только одно сообщение

Я создал одно сообщение,После того, как это было записано в topic2, я остановил брокера kafka.Через некоторое время я перезапустил брокер и выдал другое сообщение по теме1.Теперь приложение Streams обработало это сообщение 3 раза.Теперь, не останавливая посредника, я генерировал сообщения для topic1 и ждал, пока потоковое приложение не напишет в topic2, прежде чем снова производить.

Приложение Streams ведет себя странно.Иногда для одного созданного сообщения написано 2 сообщения в целевой теме, а иногда 3. Я не понимаю, почему это происходит.Я имею в виду, что даже сообщения, созданные после перезапуска брокера, дублируются.

Обновление 1:

Я использую Kafka версии 1.0.0 и Kafka-Streams версии 1.1.0

Ниже приведен код.

Main.java

String credentials = env.get("CREDENTIALS");

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "activity-collection");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, 100000);
props.put(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 200000);
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 60000);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> activityStream = builder.stream("activity_contenturl");
KStream<String, String> activityResultStream = AppUtil.hitContentUrls(credentials , activityStream);
activityResultStream.to("o365_user_activity");

AppUtil.java

public static KStream<String, String> hitContentUrls(String credentials, KStream<String, String> activityStream) {

        KStream<String, String> activityResultStream = activityStream
                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {

                        ArrayList<String> log = new ArrayList<String>();
                        JSONObject received = new JSONObject(value);
                        String url = received.get("url").toString();

                        String accessToken = ServiceUtil.getAccessToken(credentials);
                        JSONObject activityLog = ServiceUtil.getActivityLogs(url, accessToken);

                        log.add(activityLog.toString());
                    }
                    return log;
                }                   
            });

                return activityResultStream;
    }

Обновление 2:

В среде с одним брокером и одним разделом с вышеуказанным конфигом я запустил приложение брокера Kafka и потоков.Произведено 6 сообщений по исходной теме, и когда я начал потребителя по целевой теме, было 36 сообщений и их количество.Они продолжают приходить.

Итак, я запустил это, чтобы увидеть consumer-groups:

kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

Вывод:

streams-collection-app-0

Далее я запустил это:

kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group streams-collection-app-0

Вывод:

TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                                                                HOST            CLIENT-ID
o365_activity_contenturl 0          1               1               0               streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer-3a2940c2-47ab-49a0-ba72-4e49d341daee /127.0.0.1      streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer

Через некоторое время вывод показал это:

TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
o365_activity_contenturl 0          1               6               5               -               -               -

А затем:

TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
o365_activity_contenturl 0          1               7               6               -               -               -

1 Ответ

0 голосов
/ 29 сентября 2018

кажется, что вы столкнулись с известным ограничением.Тема Kafka по умолчанию хранит сообщения не менее 7 дней, но зафиксированные смещения сохраняются в течение 1 дня (значение конфигурации по умолчанию offsets.retention.minutes = 1440).так что если в исходной теме не было сообщений в течение более 1 дня, после перезапуска приложения все сообщения из темы будут повторно обработаны (фактически несколько раз, в зависимости от количества перезапусков, максимум 1 раз в день для такой темы с редкими входящими сообщениями).).

вы можете найти описание смещений с истекшим сроком действия Как истекает срок действия смещения для группы потребителей .

в kafka версии 2.0 срок хранения для зафиксированных смещений увеличен KIP-186: Увеличьте срок хранения смещения до 7 дней .

, чтобы предотвратить повторную обработку, можно добавить свойство потребителя auto.offset.reset: latest (значение по умолчанию earliest).существует небольшой риск с latest: если никто не создавал сообщение в исходную тему дольше в тот же день, и после этого вы перезапустили приложение, вы могли бы потерять некоторые сообщения (только сообщения, которые пришли именно при перезапуске).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...