Как использовать темы, указанные в экземпляре c, в потоках Kafka? - PullRequest
0 голосов
/ 02 августа 2020

У меня такая топология:

Topology topology = new Topology();

//WS connection processor
topology.addSource(WS_CONNECTION_SOURCE, new StringDeserializer(), new WebSocketConnectionEventDeserializer(), KafkaTopics.WS_CONNECTION_EVENTS_TOPIC)
    .addProcessor(SESSION_PROCESSOR, WSUserSessionProcessor::new, WS_CONNECTION_SOURCE)
    .addStateStore(sessionStoreBuilder, SESSION_PROCESSOR)
    .addSink(WS_STATUS_SINK, KafkaTopics.WS_USER_ONLINE_STATUS_TOPIC, stringSerializer, stringSerializer, SESSION_PROCESSOR)

//WS session routing
    .addSource(WS_ROUTING_BY_SESSION_SOURCE, new StringDeserializer(), new StringDeserializer(),
                    KafkaTopics.WS_DELIVERY_TOPIC)
    .addProcessor(WS_ROUTING_BY_SESSION_PROCESSOR, WSSessionRoutingProcessor::new,
                    WS_ROUTING_BY_SESSION_SOURCE)
    .addStateStore(userConnectedNodesStoreBuilder, WS_ROUTING_BY_SESSION_PROCESSOR, SESSION_PROCESSOR)

//WS delivery
    .addSource(WS_DELIVERY_SOURCE, new StringDeserializer(), new StringDeserializer(),
                    INSTANCE_SPECIFIC_TOPIC)
    .addProcessor(WS_DELIVERY_PROCESSOR,  WSDeliveryProcessor::new, WS_DELIVERY_SOURCE);  

Последним в топологии указан источник c указывает c для каждого экземпляра приложения. Я хочу, чтобы этот topi c обрабатывался только этим экземпляром. Данные в этот topi c передаются предыдущим процессором в зависимости от того, какой экземпляр должен обработать это сообщение.

Но как только поток запускается, он пытается назначить специфику экземпляра c topi c разделы также в другие экземпляры. Можем ли мы выполнить это требование в потоках Kafka?

Я хочу, чтобы один topi c обрабатывался только указанным c экземпляром.

1 Ответ

0 голосов
/ 08 августа 2020

То, что вы хотите, невозможно. Для программы Kafka Streams все экземпляры одного и того же приложения должны быть совершенно одинаковыми и, следовательно, должны иметь одинаковые темы ввода.

Вам нужно будет разделить ваше приложение на 4 приложения: первое приложение выполняет разделяет раздел программы и записывает в 3 разные темы. Кроме того, у вас есть еще 3 приложения (с собственными application.id s), каждое из которых читает одну из этих тем.

Обратите внимание, что при желании вы можете запустить несколько клиентов KafkaStreams на одной JVM.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...