Если localTweets
- конечный поток и помещается в память, то работает следующее:
Flux<Tweet> remoteTweets = Flux.just(
new Tweet("tag1", new TweetID("text", "name"), "userimage", "country", "place"),
new Tweet("tag2", new TweetID("text", "name"), "userimage", "country", "place")
);
Flux<Tweet> localTweets = Flux.just(
new Tweet("tag1", new TweetID("text", "name"), "userimage", "country", "place")
).cache(); // note cache operator here, to avoid mutiple subscription
remoteTweets
.filterWhen(remoteTweet -> localTweets.hasElement(remoteTweet).map(hasElement -> !hasElement))
.subscribe(System.out::println);
Если поток конечен, но не помещается в память, то вы должны оставить cache
оператор вышел. Это будет означать, что localTweets
Flux будет подписан несколько раз.
Если поток бесконечный, вам следует применить некоторую стратегию управления окнами (например, проверять твиты только за последние 10 минут).