Хранилища локальных состояний, находящиеся в экземплярах потока kafka, не синхронизируются в случае сбоя одного экземпляра - PullRequest
0 голосов
/ 20 июня 2019

Я запускаю несколько экземпляров потребителя потока kafka (2 экземпляра) на моем локальном компьютере, каждый из которых имеет свое собственное локальное хранилище и у каждого свое имя.

Согласно документации, если один из экземпляров выйдет из строя,Кафка должна синхронизировать хранилище мертвых экземпляров с хранилищем живого экземпляра (исправьте меня, если я ошибаюсь).

Я настроил оба экземпляра с одинаковым идентификатором приложения, чтобы kafka знал, что эти экземпляры принадлежат одному и тому жеgroup.

Когда один из экземпляров уничтожен, хранилище другого (живого) экземпляра не синхронизируется с хранилищем мертвого экземпляра.Я включил тему журнала изменений в обоих хранилищах.

Однако, когда у меня одинаковое имя хранилища в обоих экземплярах, хранилища синхронизируются, как и ожидалось, не уверен, что эти экземпляры указывают на одно хранилище. У меня разные StreamsConfig.STATE_DIR_CONFIG расположение этих двух экземпляров.

Пожалуйста, дайте мне знать, если я что-то упустил, может ли имя хранилища отличаться в разных экземплярах приложения?Кафка автоматически позаботится о воспроизведении темы журнала изменений в новом хранилище экземпляров?

// ниже - моя конфигурация потока

@Bean
public KafkaStreams kafkaStreams(KafkaProperties properties,
                                 @Value("${spring.application.name}") String appName) {
    final Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "client2");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
    //props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams1");
    props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
    props.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
            new RoundRobinAssignor().getClass().getName());
    props.put("auto.offset.reset", "earliest");
    final KafkaStreams kafkaStreams = new KafkaStreams(kafkaStreamTopology(), props);
    System.out.println("Invoked kafkaStreams");
    //kafkaStreams.cleanUp();
    kafkaStreams.start();
    return kafkaStreams;
}

1 Ответ

0 голосов
/ 25 июня 2019

Я запускаю несколько экземпляров потребителя потока kafka (2 экземпляра) на моем локальном компьютере, каждый из которых имеет свое собственное локальное хранилище и у каждого свое имя.

Это звучит неправильно.Если вы запускаете несколько экземпляров с одним и тем же application.id (т.е. group.id), все экземпляры должны выполнять один и тот же код.(Мне интересно, почему ваше приложение не падает в первую очередь.)

Я не уверен на 100%, чего вы пытаетесь достичь.Может быть полезно, если бы вы могли поделиться своим кодом топологии?

Обратите внимание, что логические хранилища шардов KafkaStreams основаны на количестве разделов входной темы (ср. https://docs.confluent.io/current/streams/architecture.html). Может быть, вы путаете шардинг с логическими хранилищами?

Если вы хотите иметь два логических хранилища, каждое с одним осколком, вы все равно можете запустить несколько экземпляров, и хранилища будут выполняться в разных экземплярах (и аварийное переключение будет работать тоже). Однако вам все равно нужно"включить" оба хранилища в обоих экземплярах при запуске.

...