Я работаю с приложением, в котором процессор, например, должен обрабатывать несколько объектов модели домена;Пользователь (-и) и Роли (-и)
В моем процессоре я пытаюсь разобраться с несколькими темами:
@StreamListener(target = "uinput", copyHeaders = "true")
@SendTo(value = { "uoutput" })
public KStream<UUID, Event> processu(KStream<UUID, Event> ustream) {...}
@StreamListener(target = "rinput", copyHeaders = "true")
@SendTo(value = { "routput" })
public KStream<UUID, Event> processu(KStream<UUID, Event> ustream) {...}
public class TestRepositoryResponseListener {
@StreamListener(target = "uminput", copyHeaders = "true")
public void process(@Payload Event event) {
log.debug("Processing User {}", event);
// log.debug("Processing2 {}",eventSaved);
}
}
и конфигурация потоков такова;
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.group-id=user-repository-client-group
spring.kafka.consumer.client-id=user-repository-client
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=com.xx.yy
spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.client-id=user-repository-producer
spring.cloud.stream.kafka.streams.binder.application-id=user-repository
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.trusted.packages=com.xx.yy
spring.cloud.stream.bindings.uinput.destination=uinput
spring.cloud.stream.bindings.uinput.group=user-processor
spring.cloud.stream.bindings.uinput.contentType=application/java-serialized-object
spring.cloud.stream.bindings.uinput.consumer.header-mode=raw
spring.cloud.stream.bindings.uinput.consumer.use-native-decoding=true
spring.cloud.stream.bindings.uoutput.destination=uoutput
spring.cloud.stream.bindings.uoutput.contentType=application/java-serialized-object
spring.cloud.stream.bindings.uoutput.producer.header-mode=raw
spring.cloud.stream.bindings.uoutput.producer.use-native-encoding=true
= TEST Streams
spring.cloud.stream.bindings.umoutput.destination=uinput
spring.cloud.stream.bindings.umoutput.contentType=application/java-serialized-object
spring.cloud.stream.bindings.umoutput.producer.header-mode=raw
spring.cloud.stream.bindings.umoutput.producer.use-native-encoding=true
==== Включение этого вызывает ошибки
spring.cloud.stream.bindings.uminput.destination=uoutput
spring.cloud.stream.bindings.uminput.contentType=application/java-serialized-object
spring.cloud.stream.bindings.uminput.group=user-processor-test-listener
spring.cloud.stream.bindings.uminput.consumer.header-mode=raw
spring.cloud.stream.bindings.uminput.consumer.use-native-decoding=true
[ERROR] 2019-01-31 08:26:51,967 org.springframework.kafka.listener.LoggingErrorHandler.handle(LoggingErrorHandler.java:37) - Error while processing: null
.
Я хочу установить несколько прослушивателей потока для каждого «процессора»и «слушатели ответа», которые кажутся более сложными, чем мои ожидания.Помогите, пожалуйста, предложить пример с kstreams и тестируемым usign junit 5, который работает с использованием следующего:
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { "uinput", "uoutput", "aoutput", "rinput", "routput" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.cloud.stream.kafka.streams.binder.brokers=${spring.embedded.kafka.brokers}" }
в качестве дополнительного пункта, если кто-то работал с @StreamListener (... condition = "payload.xx") с Kstream мне также нужен ваш
Я преодолел проблему, исправив проблемы, как предложено здесь;
Несколько StreamListeners с Spring Cloud Stream, подключенным к Kafka
Теперь, вероятно, последняя проблема, которую нужно преодолеть, - это обработка RuntimeError. Как видно из вставленного мной кода, это Spring Repo, который может генерировать все виды org.springframework.dao.DataAccessException, которые необходимо распространить обратно ввызывающая сторона, в этом случае веб-сервис уровня обслуживания.Пожалуйста, помогите