Spring Cloud Stream Kafka агрегат использовать новый экземпляр bean-компонента autowired - PullRequest
0 голосов
/ 08 ноября 2019

Есть ли способ использовать новый экземпляр компонента Autowired внутри агрегата Kafka Streams ??

@EnableBinding(Processor.class)
public class MessageReceiver {

    @StreamListener(target = Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public KStream<String, List<CustomEvent>> process(KStream<String, Event> eventKStream) {

        JsonSerde<EventAggregator> eventAggregatorJsonSerde = new JsonSerde<>(EventAggregator.class);

        TimeWindowedKStream<String, Event> timeWindowedKStream = eventKStream
                .groupByKey()
                .windowedBy(TimeWindows.of(60000).advanceBy(30000));

        timeWindowedKStream
                .aggregate(
                        EventAggregator::new, // how to use Autowired bean here, which is different for different window and different key
                        ((key, value, aggregator) -> aggregator.add(value)),
                        Materialized.with(Serdes.String(), eventAggregatorJsonSerde)
                );

        // continues...
    }

Есть ли способ использовать компонент типа Autowired вместо EventAggregator :: new ??

EventAggregator.java

public class EventAggregator {

    //@Autowired
    //private SomeClass someClass; // this is null as of now, since spring can't autowire inside a non managed bean

    List<CustomEvent> customEventList = new ArrayList<>();  
    FIFOMap<String, Integer> map = new LinkedHashMap<>(4); // map which stores only 4 value

    public EventAggregator add(Event event) {   
        map.put(new Date().toString(), event.getValue());       
        generateCustomEvent(); // when I have 4 values, I need to do some operation and generate custom event
        return this;
    }

    private void generateCustomEvent() {
        if (map.size() == 4) {
            // someClass complex operation
            CustomEvent e = new CustomEvent("", "", "");
            customEventList.add(e);
        }
    }
}

1 Ответ

0 голосов
/ 14 ноября 2019

Как указано в комментариях, создание EventAggregator::new запускается Kafka Streams. Если вы хотите получить доступ к Spring bean-компонентам внутри этого объекта, вы можете сделать это одним из способов, предоставив конструктор, который принимает контекст приложения Spring, а затем явно запросить bean-компоненты внутри класса (например, с помощью метода getBean()). Вы можете автоматически связать контекст в главном классе и затем передать его вместе с конструктором EventAggregator.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...