Весенний облачный поток Кафка агрегатируется с KStream - PullRequest
0 голосов
/ 11 декабря 2018

По сути, наше приложение имеет одну заключительную тему, к которой в конечном итоге приводят все процессы, и мне нужно иметь возможность определить, когда все сообщения обрабатываются.У меня есть количество сообщений, отправленных в коллекции mongoDB, но я должен быть в состоянии сказать, что все сообщения были обработаны.Я пытался использовать Spring Cloud Stream (поскольку это то, что используют все наши микросервисы) и KStream (что является новым для меня), и пытаюсь получить его, чтобы я агрегировал сообщения в последней теме по идентификатору и увеличивал поле вmongoDB, который представляет количество обработанных сообщений.Однако с KStream я установил windowedBy () на 30 секунд, он сбрасывает счетчик каждые 30 секунд (как и предполагалось), но пытается увеличить монго каждый раз, когда приходит сообщение.Ниже приведены некоторые части кода с выводом счетчика в Систему вместо увеличения в монго для тестирования.ПРИМЕЧАНИЕ: сейчас часть окна не работает, так как я делаю много изменений, пытаясь заставить ее работать.

input
            .groupBy((s, domainEvent) -> domainEvent.getBoardUuid(), Serialized.with(null, domainEventSerde))
            .windowedBy(TimeWindows.of(30000))
            .count(Materialized.as("fileCount")).toStream()
            .process(new ProcessorSupplier<Object, Long>() {
                public Processor<Object, Long> get() {
                    // PopularPageEmailAlert is your custom processor that implements
                    // the `Processor` interface, see further down below.
                    return new testProc("generating counts");
                }
            });

, а затем процессор

public class testProc implements Processor<Object, Long> {

private ProcessorContext context;
private String s;
public testProc(String s) {
    this.s=s;
}

@Override
public void init(ProcessorContext context) {
    this.context = context;

}



@Override
public void close() {
    // TODO Auto-generated method stub

}

@Override
public void process(Object key, Long value) {
    // TODO Auto-generated method stub
    System.out.println("****** "+key+" ********* "+value);  
}
}
...