Spring @StreamListener process (KStreamпоток) раздел - PullRequest
0 голосов
/ 15 февраля 2019

У меня есть тема с несколькими разделами в моем потоковом процессоре, я просто хотел передать это из одного раздела и не мог понять, как настроить этот

spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=s-processor
spring.cloud.stream.bindings.input.destination=uinput
spring.cloud.stream.bindings.input.group=r-processor
spring.cloud.stream.bindings.input.contentType=application/java-serialized-object
spring.cloud.stream.bindings.input.consumer.header-mode=raw
spring.cloud.stream.bindings.input.consumer.use-native-decoding=true

spring.cloud.stream.bindings.input.consumer.partitioned=true

@StreamListener(target = "input")
// @SendTo(value = { "uoutput" })
public void process(KStream<UUID, AModel> ustream) {

Я хочу, чтобы данные только одного раздела былиобработанный этим процессором, будут другие процессоры для другого раздела (ов)

До сих пор мой вывод был связан с https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/StreamsConfig.html#PARTITION_GROUPER_CLASS_CONFIG,, но я не смог найти, как установить это свойство в весеннем application.properties

Ответы [ 2 ]

0 голосов
/ 17 февраля 2019

Kafka Streams не позволяет читать один раздел.Если вы подписываетесь на тему, все разделы используются и распределяются по доступным экземплярам.Таким образом, вы не можете заранее знать, какой раздел назначен тому или иному экземпляру, и все экземпляры выполняют один и тот же код.

Но каждый раздел, связанный с процессором, имеет разные типы данных, поэтому требует другого процессораapplication

В этом случае процессор (или преобразователь) должен иметь возможность обрабатывать данные для всех разделов.Kafka Streams предоставляет номер раздела с помощью объекта ProcessorContext, который передается процессору с помощью метода init(): https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html#init-org.apache.kafka.streams.processor.ProcessorContext-

Таким образом, вам необходимо "разветвляться" внутри преобразователя, чтобы применить другую логику обработкина основе раздела:

ustream.transform(() -> new MyTransformer());


class MyTransformer implement Transformer {
  // other methods omitted

  R transform(K key, V value) {
    switch(context.partition()) { // get context from `init()`
      case 0:
        // your processing logic
        break;
      case 1:
        // your processing logic
        break;

      // ...
  }
}
0 голосов
/ 16 февраля 2019

Я думаю, что группировщик разделов должен группировать разделы с задачами в одном процессоре.Если вы хотите, чтобы процессор обрабатывал только один раздел, вам нужно предоставить как минимум то же количество экземпляров процессора, что и для разделов раздела.Например, если в вашей теме 4 раздела, вам нужно иметь 4 экземпляра потокового приложения, чтобы каждый экземпляр обрабатывал только один раздел.

...