Вопрос
С помощью Flux как получить доступ к предыдущему элементу?
Фон
У меня есть внешний поток событий, который дает события в порядке, порядке тогоПоток должен отправлять одно событие, а сразу после этого отправлять другое событие.Однако метаданные для второго события находятся в первом событии.
Обратите внимание, что это не всегда четное число событий.
То, что я пытаюсь сделать, это объединить события впоток событий для потребления вниз по течению.
Flux#zip
выглядел многообещающе, но это означало бы возврат объекта внешнего типа события.
Начальный код
Пока что яУ меня есть.
BinaryLogClient client = new BinaryLogClient(host, port, username, password);
Flux<Event> bridge = Flux.create(sink -> {
EventListener fluxListener = event -> {
sink.next(event);
};
client.registerEventListener(fluxListener);
});
bridge.subscribe(DemoApplication::printEvent);
bridge.map(new EventPairMemorizer());
public class EventPair {
private final Event previous;
private final Event current;
public EventPair(Event previous, Event current) {
this.previous = previous;
this.current = current;
}
/**
* @return `null` if no previous events.
*/
public Event getPrevious() {
return previous;
}
public Event getCurrent() {
return current;
}
}
/**
* Not thread safe has to go on a single thread
*/
public class EventPairMemorizer implements Function<Event, EventPair> {
Event previous = null;
EventPair toPair(Event e) {
EventPair pair = new EventPair(previous, e);
previous = e;
return pair;
}
@Override
public EventPair apply(Event current) {
return toPair(current);
}
}
Это частично учебное упражнение, частично подтверждение концепции.
Не относящиеся к делу детали
Я пытаюсь использовать mysql-binlog-connector-java для получения потока об изменениях в базе данных.
Итак, если я получаю событие EXT_WRITE_ROWS
, предыдущее событие будет TABLE_MAP
.Затем я хочу выполнить поиск столбца для события TABLE_MAP
(используя jdbc).Затем преобразуйте в некоторую внутреннюю структуру, дружественную JSON.
То же самое относится к событию EXT_UPDATE_ROWS
.
Итак, код идеи выглядит следующим образом:
- onExternalEvent push для Flux
- проверка типа события.При совпадении вызова jdbc в потоке jdbc с использованием Mono
- объедините Mono и текущее событие.
- сопоставьте с внутренним типом.
- отправьте в другой поток.
- прибыль