Сочетание записи потока с предыдущим и картой - PullRequest
0 голосов
/ 18 декабря 2018

Вопрос

С помощью Flux как получить доступ к предыдущему элементу?

Фон

У меня есть внешний поток событий, который дает события в порядке, порядке тогоПоток должен отправлять одно событие, а сразу после этого отправлять другое событие.Однако метаданные для второго события находятся в первом событии.

Обратите внимание, что это не всегда четное число событий.

То, что я пытаюсь сделать, это объединить события впоток событий для потребления вниз по течению.

Flux#zip выглядел многообещающе, но это означало бы возврат объекта внешнего типа события.

Начальный код

Пока что яУ меня есть.

    BinaryLogClient client = new BinaryLogClient(host, port, username, password);
    Flux<Event> bridge = Flux.create(sink -> {
        EventListener fluxListener = event -> {
            sink.next(event);
        };

        client.registerEventListener(fluxListener);
    });

    bridge.subscribe(DemoApplication::printEvent);
    bridge.map(new EventPairMemorizer());


public class EventPair  {
    private final Event previous;
    private final Event current;

    public EventPair(Event previous, Event current) {
        this.previous = previous;
        this.current = current;
    }

    /**
     * @return `null` if no previous events.
     */
    public Event getPrevious() {
        return previous;
    }

    public Event getCurrent() {
        return current;
    }
}

/**
 * Not thread safe has to go on a single thread
 */
public class EventPairMemorizer implements Function<Event, EventPair> {
    Event previous = null;

    EventPair toPair(Event e) {
        EventPair pair = new EventPair(previous, e);
        previous = e;
        return pair;
    }

    @Override
    public EventPair apply(Event current) {
        return toPair(current);
    }
}

Это частично учебное упражнение, частично подтверждение концепции.

Не относящиеся к делу детали

Я пытаюсь использовать mysql-binlog-connector-java для получения потока об изменениях в базе данных.

Итак, если я получаю событие EXT_WRITE_ROWS, предыдущее событие будет TABLE_MAP.Затем я хочу выполнить поиск столбца для события TABLE_MAP (используя jdbc).Затем преобразуйте в некоторую внутреннюю структуру, дружественную JSON.

То же самое относится к событию EXT_UPDATE_ROWS.

Итак, код идеи выглядит следующим образом:

  1. onExternalEvent push для Flux
  2. проверка типа события.При совпадении вызова jdbc в потоке jdbc с использованием Mono
  3. объедините Mono и текущее событие.
  4. сопоставьте с внутренним типом.
  5. отправьте в другой поток.
  6. прибыль

1 Ответ

0 голосов
/ 19 декабря 2018

Как насчет перекрывающихся буферов?

С buffer(2, 1) вы бы открыли буфер для каждого элемента, и каждый буфер будет содержать 2 элемента.

Затем вы можете игнорировать буферы, которые неt завершите интересующим вас событием и получите предыдущее значение для интересующих вас событий ...

...