Пожалуйста, помогите мне понять параметры Flux.join - PullRequest
1 голос
/ 08 июня 2019

У меня есть два потока событий с метками времени, которые излучаются с разными скоростями, и я хотел бы объединить их в один композитный поток на основе их перекрывающихся временных окон. Это похоже на использование учебника для Flux.join, но я не могу понять, что предоставить для пары параметров. Согласно JavaDocs , параметры:

  • other - другой издатель сопоставляет элементы с
  • leftEnd - функция, которая возвращает издателя, выбросы которого указывают временное окно для значения источника, которое следует учитывать
  • rightEnd - функция, которая возвращает издателя, выбросы которого указывают временное окно для правильного значения издателя, которое следует учитывать
  • resultSelector - функция, которая берет элемент, испускаемый каждым издателем, и возвращает значение, которое должно быть получено в результате потока

Я понимаю, что other - это второй поток, к которому нужно присоединиться, и что resultSelector - это функция, которая объединяет одно значение из каждого потока в результат или совокупное значение для вывода на новый поток. Но я не могу разобраться с leftEnd и rightEnd. Их цель должна состоять в том, чтобы позволить оператору соединения определить, какие значения из 2 потоков перекрываются во времени. Но почему эти функции должны возвращать издателей и как может быть, чтобы два издателя имели разные типы значений? Наивно, я бы ожидал создать функцию для каждого входного потока, которая извлекает временную метку или период времени для каждого значения вместе с компаратором, который может определить, перекрываются ли они, поэтому, очевидно, что-то, что я не "получаю" по этому поводу .

Пример, вероятно, в порядке. Давайте просто предположим, что 2 входных потока имеют одинаковые типы значений, состоящие из String и периода времени, обозначенного beginTime и endTime, которые представляют собой длинные миллисекунды, т.е.

class ValueClass {
    String text;
    long beginTime;
    long endTime;
}

Для простоты я напишу значения в виде {"A", 0, 250}, обозначая объект ValueClass с text = "A", beginTime = 0 и endTime = 250. Теперь предположим, что 2 входных потока дают следующие значения:

leftFlux: {"A", 0, 249}, {"B", 250, 499}, {"C", 500, 749}, {"D", 750, 999}
rightFlux: {"X", 0, 333}, {"Y", 334, 666}, {"Z", 667, 999}

На временной шкале я мог бы представить это примерно следующим образом:

A   B   C   D
X     Y    Z

Исходя из этого, я ожидаю, что объединенный поток выдаст строковые значения:

"AX", "BX", "BY", "CY", "CZ", "DZ".

Код для выполнения этого должен быть похож на:

leftFlux.join(
    rightFlux,
    new Function<ValueClass, Publisher<something>>() {
            @Override
            public Publisher<something> apply(ValueClass v) {
                return some-publisher;
            }
        },
    new Function<ValueClass, Publisher<something>>() {
            @Override
            public Publisher<something> apply(ValueClass v) {
                return some-publisher;
            }
        },
    new BiFunction<ValueClass, ValueClass, String>() {
        public String apply(ValueClass l, ValueClass r) {
            return l.text + r.text;
        }
    }
);

Какими типами значений («что-то» в приведенном выше коде) должны быть издатели? И какие ценности должны издавать эти издатели?


Обновление

Даже после просмотра (к сожалению, нескольких) тестовых случаев, включенных в Reactor Core, я не смог понять, как Flux.join предназначен для использования. Однако я нашел другое решение для моего случая. Я создал класс, который подписывается на два потока, используя внутренние классы, помещает их элементы в явные очереди, которые затем обрабатываются шаг за шагом и создают результаты с помощью DirectProcessor.

...