По сути, наше приложение имеет одну заключительную тему, к которой в конечном итоге приводят все процессы, и мне нужно иметь возможность определить, когда все сообщения обрабатываются.У меня есть количество сообщений, отправленных в коллекции mongoDB, но я должен быть в состоянии сказать, что все сообщения были обработаны.Я пытался использовать Spring Cloud Stream (поскольку это то, что используют все наши микросервисы) и KStream (что является новым для меня), и пытаюсь получить его, чтобы я агрегировал сообщения в последней теме по идентификатору и увеличивал поле вmongoDB, который представляет количество обработанных сообщений.Однако с KStream я установил windowedBy () на 30 секунд, он сбрасывает счетчик каждые 30 секунд (как и предполагалось), но пытается увеличить монго каждый раз, когда приходит сообщение.Ниже приведены некоторые части кода с выводом счетчика в Систему вместо увеличения в монго для тестирования.ПРИМЕЧАНИЕ: сейчас часть окна не работает, так как я делаю много изменений, пытаясь заставить ее работать.
input
.groupBy((s, domainEvent) -> domainEvent.getBoardUuid(), Serialized.with(null, domainEventSerde))
.windowedBy(TimeWindows.of(30000))
.count(Materialized.as("fileCount")).toStream()
.process(new ProcessorSupplier<Object, Long>() {
public Processor<Object, Long> get() {
// PopularPageEmailAlert is your custom processor that implements
// the `Processor` interface, see further down below.
return new testProc("generating counts");
}
});
, а затем процессор
public class testProc implements Processor<Object, Long> {
private ProcessorContext context;
private String s;
public testProc(String s) {
this.s=s;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void process(Object key, Long value) {
// TODO Auto-generated method stub
System.out.println("****** "+key+" ********* "+value);
}
}