Я описал входы и выходы
public interface UserStream {
String USER_CREATE_EVENT_OUTPUT = "user-event-out";
String USER_IMPORT = "user-import";
String USER_EXPORT = "user-export";
@Output(USER_EVENT_OUTPUT)
MessageChannel userEventOutput();
@Output(USER_EXPORT)
KTable<?, ?> userInput();
@Input(USER_IMPORT)
KTable<?, ?> userOutput();
}
, а также процессор
@EnableBinding(UserStream.class)
....
@StreamListener
@SendTo(UserStream.USER_EXPORT)
public KTable<UserIdDto, UserDto> webQuoteUserConsumer(@Input(UserStream.USER_IMPORT) KTable<UserIdDto, WebQuoteUser> users) {
return users
.mapValues((id, user) -> createNewUser(user, id))
.filter((id, user) -> user.isPresent())
.mapValues((id, user) -> mapper.mapToUserDto(user.get()));
}
, но получил Caused by: java.lang.IllegalStateException: No factory found for binding target type: org.apache.kafka.streams.kstream.KTable among registered factories: channelFactory,messageSourceFactory
Похоже, я что-то упустил в конфигурации.Вот что я вставил в свой yml
spring:
cloud:
stream:
bindings:
user_import:
group: steaming-service-group
destination: user-export-topic
contentType: application/json
consumer:
resetOffsets: true
startOffset: earliest
header-mode: headers
quote_import:
group: steaming-service-group
destination: web-quotes
contentType: application/json
consumer:
resetOffsets: true
startOffset: earliest
header-mode: headers
quote_export:
destination: quote_import-topic
contentType: application/json
producer:
header-mode: headers
....
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
zkNodes: localhost:2181
streams:
binder:
brokers: localhost:9092
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 1000
Пример кода был использован из https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/kafka-streams-samples/kafka-streams-table-join