После прочтения этой статьи я смог прочитать из одной темы1 и опубликовать в другой теме2. Но когда я хочу прочитать из topic2 в topic3, я получаю сообщение об ошибке типа «Компонент требует компонент с именем topic3», который не может быть найден. Поэтому я предполагаю, что еще не понял, как связаны темы.
Это работает (только для учебных целей):
/**
* get sample data from topic, create objects and send them
* @param s
* @return
*/
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object processStg1(String s) {
String arr[] = s.split(";");
if (arr[0].equalsIgnoreCase("Contract")) {
Contract c = new Contract();
c.setId(Integer.parseInt(arr[1]));
c.setName(arr[2]);
return c;
}
else if (arr[0].equalsIgnoreCase("Cashflow")) {
Cashflow cf = new Cashflow();
cf.setContractId(Integer.parseInt(arr[1]));
cf.setDate(arr[2]);
cf.setAmount(Float.parseFloat(arr[3]));
return cf;
}
return ("ERROR: could not parse type");
}
Я так понял, что привязал тему через application.properties:
spring.cloud.stream.bindings.output.destination=topic2
spring.cloud.stream.bindings.output.useNativeEncoding=true
spring.cloud.stream.bindings.input.destination=topic1
spring.cloud.stream.bindings.input.useNativeDecoding=true
Теперь я хотел бы прочитать Контракты из topic2 в том же приложении.
Что-то вроде
// это не работает
@StreamListener(Processor.INPUT)
public void processStg2(Contract c) {
System.out.println("a contract was found");
}
или
// это не работает
@StreamListener
public void process(@Input("topic2") KStream<String, Contract> contracts) {
System.out.println("Found contracts");
}
public interface ContractSink extends Sink {
@Input("topic2")
KStream<?, ?> inputStream();
}
Компонент требует bean-компонент с именем topic2, который не может быть найден.