Предположим, у меня есть корзина, из которой мне нужно получить документы с датой старше, чем сейчас. Этот документ выглядит следующим образом:
{
id: "1",
date: "Some date",
otherObjectKEY: "key1"
}
Для каждого документа мне нужно получить другой документ, используя его otherObjectKEY , отправить последний в топику kafka c, а затем удалить оригинал document.
Используя реагирующий java драйвер 3.0 , я смог сделать это примерно так:
public void batch(){
streamOriginalObjects()
.flatMap(originalObject -> fetchOtherObjectUsingItsKEY(originalObject)
.flatMap(otherObject -> sendToKafkaAndDeleteOriginalObject(originalObject))
)
.subscribe();
}
streamOriginalObjects ():
public Flux<OriginalObject> streamOriginalObjects(){
return client.query("select ... and date <= '"+ LocalDateTime.now().toString() +"'")
.flux()
.flatMap(result -> result.rowsAs(OriginalObject.class));
}
Это работает, как и ожидалось, но мне интересно, есть ли лучший подход (особенно с точки зрения производительности), чем потоковая передача и обработка элемент за элементом.