У меня есть один узел кафка брокер и простое приложение потоков.Я создал 2 темы (topic1 и topic2).
Produced on topic1 - processed message - write to topic2
Примечание. Для каждого созданного сообщения в целевую тему записывается только одно сообщение
Я создал одно сообщение,После того, как это было записано в topic2, я остановил брокера kafka.Через некоторое время я перезапустил брокер и выдал другое сообщение по теме1.Теперь приложение Streams обработало это сообщение 3 раза.Теперь, не останавливая посредника, я генерировал сообщения для topic1 и ждал, пока потоковое приложение не напишет в topic2, прежде чем снова производить.
Приложение Streams ведет себя странно.Иногда для одного созданного сообщения написано 2 сообщения в целевой теме, а иногда 3. Я не понимаю, почему это происходит.Я имею в виду, что даже сообщения, созданные после перезапуска брокера, дублируются.
Обновление 1:
Я использую Kafka версии 1.0.0 и Kafka-Streams версии 1.1.0
Ниже приведен код.
Main.java
String credentials = env.get("CREDENTIALS");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "activity-collection");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, 100000);
props.put(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 200000);
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 60000);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> activityStream = builder.stream("activity_contenturl");
KStream<String, String> activityResultStream = AppUtil.hitContentUrls(credentials , activityStream);
activityResultStream.to("o365_user_activity");
AppUtil.java
public static KStream<String, String> hitContentUrls(String credentials, KStream<String, String> activityStream) {
KStream<String, String> activityResultStream = activityStream
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
ArrayList<String> log = new ArrayList<String>();
JSONObject received = new JSONObject(value);
String url = received.get("url").toString();
String accessToken = ServiceUtil.getAccessToken(credentials);
JSONObject activityLog = ServiceUtil.getActivityLogs(url, accessToken);
log.add(activityLog.toString());
}
return log;
}
});
return activityResultStream;
}
Обновление 2:
В среде с одним брокером и одним разделом с вышеуказанным конфигом я запустил приложение брокера Kafka и потоков.Произведено 6 сообщений по исходной теме, и когда я начал потребителя по целевой теме, было 36 сообщений и их количество.Они продолжают приходить.
Итак, я запустил это, чтобы увидеть consumer-groups
:
kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
Вывод:
streams-collection-app-0
Далее я запустил это:
kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group streams-collection-app-0
Вывод:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
o365_activity_contenturl 0 1 1 0 streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer-3a2940c2-47ab-49a0-ba72-4e49d341daee /127.0.0.1 streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer
Через некоторое время вывод показал это:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
o365_activity_contenturl 0 1 6 5 - - -
А затем:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
o365_activity_contenturl 0 1 7 6 - - -