Я запускаю несколько экземпляров потребителя потока kafka (2 экземпляра) на моем локальном компьютере, каждый из которых имеет свое собственное локальное хранилище и у каждого свое имя.
Согласно документации, если один из экземпляров выйдет из строя,Кафка должна синхронизировать хранилище мертвых экземпляров с хранилищем живого экземпляра (исправьте меня, если я ошибаюсь).
Я настроил оба экземпляра с одинаковым идентификатором приложения, чтобы kafka знал, что эти экземпляры принадлежат одному и тому жеgroup.
Когда один из экземпляров уничтожен, хранилище другого (живого) экземпляра не синхронизируется с хранилищем мертвого экземпляра.Я включил тему журнала изменений в обоих хранилищах.
Однако, когда у меня одинаковое имя хранилища в обоих экземплярах, хранилища синхронизируются, как и ожидалось, не уверен, что эти экземпляры указывают на одно хранилище. У меня разные StreamsConfig.STATE_DIR_CONFIG расположение этих двух экземпляров.
Пожалуйста, дайте мне знать, если я что-то упустил, может ли имя хранилища отличаться в разных экземплярах приложения?Кафка автоматически позаботится о воспроизведении темы журнала изменений в новом хранилище экземпляров?
// ниже - моя конфигурация потока
@Bean
public KafkaStreams kafkaStreams(KafkaProperties properties,
@Value("${spring.application.name}") String appName) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
props.put(StreamsConfig.CLIENT_ID_CONFIG, "client2");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
//props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams1");
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
new RoundRobinAssignor().getClass().getName());
props.put("auto.offset.reset", "earliest");
final KafkaStreams kafkaStreams = new KafkaStreams(kafkaStreamTopology(), props);
System.out.println("Invoked kafkaStreams");
//kafkaStreams.cleanUp();
kafkaStreams.start();
return kafkaStreams;
}