Уменьшить выбросы потока на основе стоимости? - PullRequest
0 голосов
/ 17 октября 2018

У меня следующий поток:

val myFlux = Flux.fromIterable(listOf("A", "A", "B", "C", "C"))

Элементы внутри этого потока отсортированы, и сам поток огромен (можно предположить, что он бесконечный)

Я хотел бы отобразить егов Flux<String>, где каждый String будет содержать AA B CC соответственно.

Также может быть Flux<SomeDomainObject>, где каждый SomeDomainObject будет содержать одинаковые ключи (например, AA)

Я не могу использовать groupBy, поскольку количество ключей будет огромным.

Требуемое поведение потока: desired flux behavior

Ответы [ 2 ]

0 голосов
/ 17 октября 2018

Вы можете использовать bufferUntil -функцию: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#bufferUntil-java.util.function.Predicate-boolean-

fun<T> newItem(): Predicate<T> {
    var lastItem: T? = null

    return Predicate<T> { newItem ->
        when {
            lastItem == null -> {
                lastItem = newItem
                false
            }
            lastItem != newItem -> {
                lastItem = newItem
                true
            }
            else -> false
        }
    }
}

fun main(args: Array<String>) {
    val flux = Flux.just("A", "A", "B", "C", "C")
    val buffered = flux.bufferUntil(newItem(), true).map { it.joinToString("") }

    buffered.subscribe {
        println(it)
    }
}
0 голосов
/ 17 октября 2018

А как насчет этого фрагмента кода:

Flux<String> letters = Flux.just("A", "A", "A", "A", "B", "C", "C");
Flux<String> sequence = letters.bufferUntil(new Predicate<String>() {

        private AtomicReference<String> currentLetter = new AtomicReference<>("");

        @Override
        public boolean test(String letter) {
            String current = currentLetter.getAndSet(letter);
            return current.equals(letter);
        }
    }).map(list -> String.join("", list));
...