Как мы можем очистить состояние в Flink на основе внешнего события? - PullRequest
1 голос
/ 11 апреля 2019

Я работаю над простым случаем, когда мы объединяем Поток 1 (Позиции) с Потоком 2 (Цена) и связываем последние данные Позиции с данными о цене. Для этого я расширяю RichCoFlatMapFunction, он создает объект-обертку, который собирает данные из обоих потоков. В процессе он также сохраняет данные в своем MapState.

В конце дня на основе данных другого потока (например, потока изменения даты) мне необходимо очистить состояния. Как я могу это сделать? В основном мне нужно очистить priceState и positionState. Я не уверен, сможем ли мы получить трансляцию для этого?

Пример кода для объединения двух потоков ниже

static final class PositionPriceWrapperBuilder extends RichCoFlatMapFunction<Position, Price, PositionPriceWrapper> {



    private transient MapState<String, Price>  priceState;
    private transient MapState<String, Position>  positionState;        

    @Override
    public void open(Configuration parameters) throws Exception {


        MapStateDescriptor<String, Price>  descPrice = new MapStateDescriptor<String, Price>(
                "priceState",
                String.class,
                Price.class);           
        priceState = getRuntimeContext().getMapState(descPrice);                
        System.out.println("descPrice:: " + descPrice);
        //Same thing needs to be done for Price?


        MapStateDescriptor<String, Position>  descPos = new MapStateDescriptor<String, Position>(
                "positionState",
                String.class,
                Position.class);            
        positionState = getRuntimeContext().getMapState(descPos);               
        System.out.println("positionState:: " + positionState);

    }           

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Override
    public void flatMap1(Position position, Collector<PositionPriceWrapper> out) throws Exception {
          try {
              //= pnlState.get(position.getId());
              Price price = priceState.get(position.getId());
              PositionPriceWrapper ppw = new PositionPriceWrapper();
              ppw.setPrice(price);
              ppw.setPosition(position);
              ppw.setAccount(position.getAccount());
              ppw.setCusip(position.getCusip());

              System.out.println("Built ppw -->" + ppw);

              positionState.put(position.getId(), position);
              out.collect(ppw);
          }
          catch ( Exception e) {
              e.printStackTrace();
          }

    }

    @Override
    public void flatMap2(Price price, Collector<PositionPriceWrapper> out) throws Exception {
          try {
              Position position = positionState.get(price.getId());
              PositionPriceWrapper ppw = new PositionPriceWrapper();
              ppw.setPrice(price);
              ppw.setPosition(position);
              ppw.setAccount(price.getAccount());
              ppw.setCusip(price.getCusip());

              priceState.put(price.getId(), price);
              out.collect(ppw);
          }
          catch ( Exception e) {
              e.printStackTrace();
          }         

    }

}

1 Ответ

0 голосов
/ 12 апреля 2019

То, что вы хотите сделать, было бы просто, если бы Флинк предложил оператор с тремя входами, но это не так.Flink поддерживает только операторы с одним или двумя входами.

Одним из вариантов может быть преобразование RichCoFlatMap в функцию CoProcessFunction и использование таймера для запуска очистки состояния.Или полагайтесь на механизм StateTTL для очистки состояния.

Если вам нужно явно запустить очистку состояния, вы могли бы использовать union () для объединения цены и позициипотоков в DataStream<Either<Price, Position>> (или сначала сопоставить оба потока с каким-либо унифицированным типом), а затем соединить этот поток с широковещательным потоком, который имеет сигналы, инициирующие очистку состояния.Или вы можете объединить все три потока вместе, если все они будут одинаково настроены.

...