Я работаю над простым случаем, когда мы объединяем Поток 1 (Позиции) с Потоком 2 (Цена) и связываем последние данные Позиции с данными о цене.
Для этого я расширяю RichCoFlatMapFunction, он создает объект-обертку, который собирает данные из обоих потоков.
В процессе он также сохраняет данные в своем MapState.
В конце дня на основе данных другого потока (например, потока изменения даты) мне необходимо очистить состояния.
Как я могу это сделать?
В основном мне нужно очистить priceState и positionState. Я не уверен, сможем ли мы получить трансляцию для этого?
Пример кода для объединения двух потоков ниже
static final class PositionPriceWrapperBuilder extends RichCoFlatMapFunction<Position, Price, PositionPriceWrapper> {
private transient MapState<String, Price> priceState;
private transient MapState<String, Position> positionState;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Price> descPrice = new MapStateDescriptor<String, Price>(
"priceState",
String.class,
Price.class);
priceState = getRuntimeContext().getMapState(descPrice);
System.out.println("descPrice:: " + descPrice);
//Same thing needs to be done for Price?
MapStateDescriptor<String, Position> descPos = new MapStateDescriptor<String, Position>(
"positionState",
String.class,
Position.class);
positionState = getRuntimeContext().getMapState(descPos);
System.out.println("positionState:: " + positionState);
}
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void flatMap1(Position position, Collector<PositionPriceWrapper> out) throws Exception {
try {
//= pnlState.get(position.getId());
Price price = priceState.get(position.getId());
PositionPriceWrapper ppw = new PositionPriceWrapper();
ppw.setPrice(price);
ppw.setPosition(position);
ppw.setAccount(position.getAccount());
ppw.setCusip(position.getCusip());
System.out.println("Built ppw -->" + ppw);
positionState.put(position.getId(), position);
out.collect(ppw);
}
catch ( Exception e) {
e.printStackTrace();
}
}
@Override
public void flatMap2(Price price, Collector<PositionPriceWrapper> out) throws Exception {
try {
Position position = positionState.get(price.getId());
PositionPriceWrapper ppw = new PositionPriceWrapper();
ppw.setPrice(price);
ppw.setPosition(position);
ppw.setAccount(price.getAccount());
ppw.setCusip(price.getCusip());
priceState.put(price.getId(), price);
out.collect(ppw);
}
catch ( Exception e) {
e.printStackTrace();
}
}
}