Несколько Kstream @StreamListener - PullRequest
0 голосов
/ 31 января 2019

Я работаю с приложением, в котором процессор, например, должен обрабатывать несколько объектов модели домена;Пользователь (-и) и Роли (-и)

В моем процессоре я пытаюсь разобраться с несколькими темами:

@StreamListener(target = "uinput", copyHeaders = "true")
@SendTo(value = { "uoutput" })
public KStream<UUID, Event> processu(KStream<UUID, Event> ustream) {...}

@StreamListener(target = "rinput", copyHeaders = "true")
@SendTo(value = { "routput" })
public KStream<UUID, Event> processu(KStream<UUID, Event> ustream) {...}


public class TestRepositoryResponseListener {

@StreamListener(target = "uminput", copyHeaders = "true")
public void process(@Payload Event event) {
    log.debug("Processing User {}", event);

    // log.debug("Processing2 {}",eventSaved);
}

}

и конфигурация потоков такова;

spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.group-id=user-repository-client-group
spring.kafka.consumer.client-id=user-repository-client

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=com.xx.yy


spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.client-id=user-repository-producer

spring.cloud.stream.kafka.streams.binder.application-id=user-repository


spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.trusted.packages=com.xx.yy

spring.cloud.stream.bindings.uinput.destination=uinput
spring.cloud.stream.bindings.uinput.group=user-processor
spring.cloud.stream.bindings.uinput.contentType=application/java-serialized-object
spring.cloud.stream.bindings.uinput.consumer.header-mode=raw
spring.cloud.stream.bindings.uinput.consumer.use-native-decoding=true

spring.cloud.stream.bindings.uoutput.destination=uoutput
spring.cloud.stream.bindings.uoutput.contentType=application/java-serialized-object
spring.cloud.stream.bindings.uoutput.producer.header-mode=raw
spring.cloud.stream.bindings.uoutput.producer.use-native-encoding=true

= TEST Streams

spring.cloud.stream.bindings.umoutput.destination=uinput
spring.cloud.stream.bindings.umoutput.contentType=application/java-serialized-object
spring.cloud.stream.bindings.umoutput.producer.header-mode=raw
spring.cloud.stream.bindings.umoutput.producer.use-native-encoding=true

==== Включение этого вызывает ошибки

spring.cloud.stream.bindings.uminput.destination=uoutput
spring.cloud.stream.bindings.uminput.contentType=application/java-serialized-object
spring.cloud.stream.bindings.uminput.group=user-processor-test-listener
spring.cloud.stream.bindings.uminput.consumer.header-mode=raw
spring.cloud.stream.bindings.uminput.consumer.use-native-decoding=true

[ERROR] 2019-01-31 08:26:51,967 org.springframework.kafka.listener.LoggingErrorHandler.handle(LoggingErrorHandler.java:37) - Error while processing: null
.

Я хочу установить несколько прослушивателей потока для каждого «процессора»и «слушатели ответа», которые кажутся более сложными, чем мои ожидания.Помогите, пожалуйста, предложить пример с kstreams и тестируемым usign junit 5, который работает с использованием следующего:

@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { "uinput", "uoutput", "aoutput", "rinput", "routput" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
    "spring.cloud.stream.kafka.streams.binder.brokers=${spring.embedded.kafka.brokers}" }

в качестве дополнительного пункта, если кто-то работал с @StreamListener (... condition = "payload.xx") с Kstream мне также нужен ваш

Я преодолел проблему, исправив проблемы, как предложено здесь;

Несколько StreamListeners с Spring Cloud Stream, подключенным к Kafka

Теперь, вероятно, последняя проблема, которую нужно преодолеть, - это обработка RuntimeError. Как видно из вставленного мной кода, это Spring Repo, который может генерировать все виды org.springframework.dao.DataAccessException, которые необходимо распространить обратно ввызывающая сторона, в этом случае веб-сервис уровня обслуживания.Пожалуйста, помогите

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...