Я работаю над системой обнаружения мошенничества, используя Apache Flink, но я новичок и застрял в этой проблеме:
Я хочу создать левое соединение из двух потоков, один из которых содержит текущая транзакция и другая, которая проверила транзакции с банком, где я могу найти, если были какие-то ошибки, такие как stolen_card, et c. Поэтому мне нужно присоединиться к ним, чтобы узнать, была ли отклонена карта в прошлом.
DataStream<Card> currentDataStream = getCardsStream(env, Parameters.CURRENT_SOCKET)
.keyBy((card) -> card.getCardID);
DataStream<Card> historicDataStream = getCardsStream(env, Parameters.HISTORIC_SOCKET)
.keyBy((card) -> card.getCardID());
Сейчас я выполняю функцию RichCoFlatMapFunction, которая обновляет состояние списка с именем исторический список . каждый раз, когда приходит исторический поток данных и возвращает кортеж с текущей картой и список со всеми объединенными вхождениями для этого идентификатора:
public class LeftJoin extends RichCoFlatMapFunction<Card, Card, Tuple2<Card, List<Card>> > {
private ValueState<Card> currentValueState;
private ListState<Card> historicListState;
@Override
public void open(Configuration parameters) throws Exception {
currentValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("Current State", Card.class));
historicListState = getRuntimeContext().getListState(new ListStateDescriptor<>("historic state", Card.class));
}
@Override
public void flatMap1(Card currentCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
Iterable<Card> historicCardList = historicListState.get();
//If there is a coincidence
if (Iterables.size(historicCardList) > 0) {
out.collect(new Tuple2<>(currentCard, Lists.newArrayList(historicCardList) ));
} else {
currentValueState.update(currentCard);
//Returning null if there are no cards for the Id
out.collect(new Tuple2<>(currentCard, null));
}
}
@Override
public void flatMap2(Card historicCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
historicListState.add(historicCard); //Updates the historicListState
}
}
Дело в том, что List<Card>
вызывает у меня много проблем позже когда я хочу сравнить правила с содержащимися картами, потому что он всегда получает все карты снова, и мне нужен способ пометить карты, которые я уже обработал, в соответствии с моими правилами, что-то вроде этого:
//I don't like this list because it always gets me all the join coincidences
for (Card card : historicList) {
//Comparar cada regla del Broadcast state con el error que contiene el elemento card
if (rule.getBankDecision().equals(card.getErrors())) {
//Evaluate some rules
for (Long stateEventTime : windowState.keys()) {
if (isStateValueInWindow(stateEventTime, windowStartForEvent, System.currentTimeMillis())) {
aggregateValuesInState(stateEventTime, aggregator);
}
}
}
Is Есть ли лучший способ получить объединенные карты в виде потока?