Я пытаюсь создать приложение, управляемое событиями.Требуется, чтобы некоторые службы использовали только события Kafka для хранения информации, поэтому я использую таблицы Kafka.
У меня есть две службы, которые совместно используют одну и ту же таблицу Kafka, это сложная бизнес-логика, поэтому яхочу убедиться, что один и тот же код создает таблицу.
Мой вопрос таков: могу ли я использовать один и тот же идентификатор клиента для разных экземпляров одной и той же таблицы Кафки?
Я построил пример,и это работает один раз.Но сейчас у меня проблемы.Всегда приводится исключение: The state store, topic-name, may have migrated to another instance.
Мои обе службы работают на одной машине без контейнеров.
Я настраиваю свои потоки Kafka с помощью:
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationName);
config.put(StreamsConfig.CLIENT_ID_CONFIG, applicationName + "-client");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ConfigurationResolver.get().resolve("kafka.broker"));
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, IgnoreTypeMismatch.class);
config.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, IgnoreTypeMismatch.class);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericGsonSerde.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
KafkaStreams streams = new KafkaStreams(builder.build(), config)
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));