Kafka Streams не позволяет читать один раздел.Если вы подписываетесь на тему, все разделы используются и распределяются по доступным экземплярам.Таким образом, вы не можете заранее знать, какой раздел назначен тому или иному экземпляру, и все экземпляры выполняют один и тот же код.
Но каждый раздел, связанный с процессором, имеет разные типы данных, поэтому требует другого процессораapplication
В этом случае процессор (или преобразователь) должен иметь возможность обрабатывать данные для всех разделов.Kafka Streams предоставляет номер раздела с помощью объекта ProcessorContext
, который передается процессору с помощью метода init()
: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html#init-org.apache.kafka.streams.processor.ProcessorContext-
Таким образом, вам необходимо "разветвляться" внутри преобразователя, чтобы применить другую логику обработкина основе раздела:
ustream.transform(() -> new MyTransformer());
class MyTransformer implement Transformer {
// other methods omitted
R transform(K key, V value) {
switch(context.partition()) { // get context from `init()`
case 0:
// your processing logic
break;
case 1:
// your processing logic
break;
// ...
}
}