У меня такая топология:
Topology topology = new Topology();
//WS connection processor
topology.addSource(WS_CONNECTION_SOURCE, new StringDeserializer(), new WebSocketConnectionEventDeserializer(), KafkaTopics.WS_CONNECTION_EVENTS_TOPIC)
.addProcessor(SESSION_PROCESSOR, WSUserSessionProcessor::new, WS_CONNECTION_SOURCE)
.addStateStore(sessionStoreBuilder, SESSION_PROCESSOR)
.addSink(WS_STATUS_SINK, KafkaTopics.WS_USER_ONLINE_STATUS_TOPIC, stringSerializer, stringSerializer, SESSION_PROCESSOR)
//WS session routing
.addSource(WS_ROUTING_BY_SESSION_SOURCE, new StringDeserializer(), new StringDeserializer(),
KafkaTopics.WS_DELIVERY_TOPIC)
.addProcessor(WS_ROUTING_BY_SESSION_PROCESSOR, WSSessionRoutingProcessor::new,
WS_ROUTING_BY_SESSION_SOURCE)
.addStateStore(userConnectedNodesStoreBuilder, WS_ROUTING_BY_SESSION_PROCESSOR, SESSION_PROCESSOR)
//WS delivery
.addSource(WS_DELIVERY_SOURCE, new StringDeserializer(), new StringDeserializer(),
INSTANCE_SPECIFIC_TOPIC)
.addProcessor(WS_DELIVERY_PROCESSOR, WSDeliveryProcessor::new, WS_DELIVERY_SOURCE);
Последним в топологии указан источник c указывает c для каждого экземпляра приложения. Я хочу, чтобы этот topi c обрабатывался только этим экземпляром. Данные в этот topi c передаются предыдущим процессором в зависимости от того, какой экземпляр должен обработать это сообщение.
Но как только поток запускается, он пытается назначить специфику экземпляра c topi c разделы также в другие экземпляры. Можем ли мы выполнить это требование в потоках Kafka?
Я хочу, чтобы один topi c обрабатывался только указанным c экземпляром.