У меня есть два потока событий с метками времени, которые излучаются с разными скоростями, и я хотел бы объединить их в один композитный поток на основе их перекрывающихся временных окон. Это похоже на использование учебника для Flux.join, но я не могу понять, что предоставить для пары параметров. Согласно JavaDocs , параметры:
other
- другой издатель сопоставляет элементы с
leftEnd
- функция, которая возвращает издателя, выбросы которого указывают временное окно для значения источника, которое следует учитывать
rightEnd
- функция, которая возвращает издателя, выбросы которого указывают временное окно для правильного значения издателя, которое следует учитывать
resultSelector
- функция, которая берет элемент, испускаемый каждым издателем, и возвращает значение, которое должно быть получено в результате потока
Я понимаю, что other
- это второй поток, к которому нужно присоединиться, и что resultSelector
- это функция, которая объединяет одно значение из каждого потока в результат или совокупное значение для вывода на новый поток. Но я не могу разобраться с leftEnd
и rightEnd
. Их цель должна состоять в том, чтобы позволить оператору соединения определить, какие значения из 2 потоков перекрываются во времени. Но почему эти функции должны возвращать издателей и как может быть, чтобы два издателя имели разные типы значений? Наивно, я бы ожидал создать функцию для каждого входного потока, которая извлекает временную метку или период времени для каждого значения вместе с компаратором, который может определить, перекрываются ли они, поэтому, очевидно, что-то, что я не "получаю" по этому поводу .
Пример, вероятно, в порядке. Давайте просто предположим, что 2 входных потока имеют одинаковые типы значений, состоящие из String и периода времени, обозначенного beginTime и endTime, которые представляют собой длинные миллисекунды, т.е.
class ValueClass {
String text;
long beginTime;
long endTime;
}
Для простоты я напишу значения в виде {"A", 0, 250}, обозначая объект ValueClass с text = "A", beginTime = 0 и endTime = 250. Теперь предположим, что 2 входных потока дают следующие значения:
leftFlux: {"A", 0, 249}, {"B", 250, 499}, {"C", 500, 749}, {"D", 750, 999}
rightFlux: {"X", 0, 333}, {"Y", 334, 666}, {"Z", 667, 999}
На временной шкале я мог бы представить это примерно следующим образом:
A B C D
X Y Z
Исходя из этого, я ожидаю, что объединенный поток выдаст строковые значения:
"AX", "BX", "BY", "CY", "CZ", "DZ".
Код для выполнения этого должен быть похож на:
leftFlux.join(
rightFlux,
new Function<ValueClass, Publisher<something>>() {
@Override
public Publisher<something> apply(ValueClass v) {
return some-publisher;
}
},
new Function<ValueClass, Publisher<something>>() {
@Override
public Publisher<something> apply(ValueClass v) {
return some-publisher;
}
},
new BiFunction<ValueClass, ValueClass, String>() {
public String apply(ValueClass l, ValueClass r) {
return l.text + r.text;
}
}
);
Какими типами значений («что-то» в приведенном выше коде) должны быть издатели? И какие ценности должны издавать эти издатели?
Обновление
Даже после просмотра (к сожалению, нескольких) тестовых случаев, включенных в Reactor Core, я не смог понять, как Flux.join предназначен для использования. Однако я нашел другое решение для моего случая. Я создал класс, который подписывается на два потока, используя внутренние классы, помещает их элементы в явные очереди, которые затем обрабатываются шаг за шагом и создают результаты с помощью DirectProcessor.