У меня есть Stream Listener как
@StreamListener(target = "requesti")
@SendTo("responseo")
public KStream<UUID,Account> process(KStream<UUID, Account> events) {
// Predicate<UUID, Event> isAccount = (key, value) ->
// value.getEntity().getClass().equals(Account.class);
// @formatter:off
return events
//.filter(isAccount)
.peek((key, value) -> {
log.debug("Processing {} {}", key, value);
});
/*
.filter(isAccount)
.map((key, value) -> process(value))
.peek((key, value) -> {
log.debug("Processed {} {}", key, value);
});
*/
// @formatter:on
}
Где конфигурация @Input ("requestti") следующая:
spring.cloud.stream.kafka.streams.bindings.requesti.consumer.application-id=repo-event-consumer
spring.cloud.stream.bindings.requesti.destination=request
spring.cloud.stream.bindings.requesti.content-type=application/json
spring.cloud.stream.bindings.requesti.consumer.header-mode=raw
и @output ("responseo")Конфигурация выглядит следующим образом:
spring.cloud.stream.kafka.streams.bindings.responseo.consumer.application-id=repo-response-producer
spring.cloud.stream.bindings.responseo.destination=response
spring.cloud.stream.bindings.responseo.content-type=application/json
spring.cloud.stream.bindings.responseo.producer.header-mode=raw
spring.cloud.stream.bindings.responseo.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.responseo.producer.key-serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.bindings.responseo.producer.value-serde=org.springframework.kafka.support.serializer.JsonSerde
Мой Процессор получает запрос и может также отправлять вывод, но вывод следующий:
[Producer clientId = repo-event-consumer -49827b40-2357-4af0-8103-228343faa59e-StreamThread-1-Manufacturer] Отправка записи ProducerRecord (тема = ответ, раздел = ноль, заголовки = RecordHeaders (заголовки = [RecordHeader (ключ = TypeId ), значение = [117, 107, 46, 111, 114, 103, 46, 99, 97, 116, 97, 112, 117, 108, 116, 46, 101, 115, 46, 99, 117, 98, 101,46, 115, 101, 114, 118, 105, 99, 101, 115, 46, 97, 99, 99, 111, 117, 110, 116, 46, 109, 111, 100, 101, 108, 46, 65,99, 99, 111, 117, 110, 116])], isReadOnly = true), ключ = [B @ 6a5e4294, значение = [B @ 5a0852e1, отметка времени = 1551093349173) с обратным вызовом org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1@336dbba5 к ответной теме темына 2
Несколько вещей, которые я путаю с идентификатором записи производителя, не является "repo-response-produser", и, во-вторых, key-serde / value-serde не используется, на мой взгляд, это должно было быть
Отправка записи ProducerRecord (тема = запрос, раздел = ноль, заголовки = RecordHeaders (заголовки = [RecordHeader (ключ = Key_TypeId * , значение = [106, 97, 118, 97, 46, 117, 116,105, 108, 46, 85, 85, 73, 68]), RecordHeader (ключ = TypeId , значение = [117, 107, 46, 111, 114, 103, 46, 99, 97, 116, 97, 112, 117, 108, 116, 46, 101, 115, 46, 99, 117, 98, 101, 46, 115, 101, 114, 118, 105, 99, 101, 115, 46, 97, 99, 99, 111, 117, 110, 116, 46, 109, 111, 100, 101, 108, 46, 65, 99, 99, 111, 117, 110, 116])], isReadOnly = true), ключ = 6f0f50e2-3add-4d22-a370-cac66d016af0, значение = Account () с обратным вызовом org.springframework.kafka.core.KafkaTemplate$$Lambda$582/533392019@85ab964 для раздела запроса темы 2
и по умолчанию serdeConfig
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
Репо