Какой идентификатор клиента Kafka Table используется - PullRequest
0 голосов
/ 08 октября 2018

Я пытаюсь создать приложение, управляемое событиями.Требуется, чтобы некоторые службы использовали только события Kafka для хранения информации, поэтому я использую таблицы Kafka.

У меня есть две службы, которые совместно используют одну и ту же таблицу Kafka, это сложная бизнес-логика, поэтому яхочу убедиться, что один и тот же код создает таблицу.

Мой вопрос таков: могу ли я использовать один и тот же идентификатор клиента для разных экземпляров одной и той же таблицы Кафки?

Я построил пример,и это работает один раз.Но сейчас у меня проблемы.Всегда приводится исключение: The state store, topic-name, may have migrated to another instance.

Мои обе службы работают на одной машине без контейнеров.

Я настраиваю свои потоки Kafka с помощью:

Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationName);
config.put(StreamsConfig.CLIENT_ID_CONFIG, applicationName + "-client");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ConfigurationResolver.get().resolve("kafka.broker"));
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, IgnoreTypeMismatch.class);
config.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, IgnoreTypeMismatch.class);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericGsonSerde.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
KafkaStreams streams = new KafkaStreams(builder.build(), config)
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

1 Ответ

0 голосов
/ 08 октября 2018

Если у вас есть две разные службы, вы должны использовать две разные application.id s.Вы не можете использовать один и тот же KTable в разных службах, но для каждой службы требуется собственная копия KTable.Таким образом, вы либо используете один и тот же код для обеих служб, чтобы создать один и тот же KTable, либо вы позволяете одному сервису вычислять KTable, записываете его в тему, а другой сервис использует эту тему, чтобы получить копиюKTable.

Если вы хотите поделиться KTable, вам нужно объединить обе службы в одну службу с одним application.id.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...