Есть ли способ использовать новый экземпляр компонента Autowired внутри агрегата Kafka Streams ??
@EnableBinding(Processor.class)
public class MessageReceiver {
@StreamListener(target = Processor.INPUT)
@SendTo(Processor.OUTPUT)
public KStream<String, List<CustomEvent>> process(KStream<String, Event> eventKStream) {
JsonSerde<EventAggregator> eventAggregatorJsonSerde = new JsonSerde<>(EventAggregator.class);
TimeWindowedKStream<String, Event> timeWindowedKStream = eventKStream
.groupByKey()
.windowedBy(TimeWindows.of(60000).advanceBy(30000));
timeWindowedKStream
.aggregate(
EventAggregator::new, // how to use Autowired bean here, which is different for different window and different key
((key, value, aggregator) -> aggregator.add(value)),
Materialized.with(Serdes.String(), eventAggregatorJsonSerde)
);
// continues...
}
Есть ли способ использовать компонент типа Autowired вместо EventAggregator :: new ??
EventAggregator.java
public class EventAggregator {
//@Autowired
//private SomeClass someClass; // this is null as of now, since spring can't autowire inside a non managed bean
List<CustomEvent> customEventList = new ArrayList<>();
FIFOMap<String, Integer> map = new LinkedHashMap<>(4); // map which stores only 4 value
public EventAggregator add(Event event) {
map.put(new Date().toString(), event.getValue());
generateCustomEvent(); // when I have 4 values, I need to do some operation and generate custom event
return this;
}
private void generateCustomEvent() {
if (map.size() == 4) {
// someClass complex operation
CustomEvent e = new CustomEvent("", "", "");
customEventList.add(e);
}
}
}