Правильный способ сделать Left Join на двух потоках в Apache Flink - PullRequest
0 голосов
/ 06 апреля 2020

Я работаю над системой обнаружения мошенничества, используя Apache Flink, но я новичок и застрял в этой проблеме:

Я хочу создать левое соединение из двух потоков, один из которых содержит текущая транзакция и другая, которая проверила транзакции с банком, где я могу найти, если были какие-то ошибки, такие как stolen_card, et c. Поэтому мне нужно присоединиться к ним, чтобы узнать, была ли отклонена карта в прошлом.

   DataStream<Card> currentDataStream =  getCardsStream(env, Parameters.CURRENT_SOCKET)
            .keyBy((card) -> card.getCardID);

    DataStream<Card> historicDataStream =  getCardsStream(env, Parameters.HISTORIC_SOCKET)
            .keyBy((card) -> card.getCardID()); 

Сейчас я выполняю функцию RichCoFlatMapFunction, которая обновляет состояние списка с именем исторический список . каждый раз, когда приходит исторический поток данных и возвращает кортеж с текущей картой и список со всеми объединенными вхождениями для этого идентификатора:

public class LeftJoin extends RichCoFlatMapFunction<Card, Card, Tuple2<Card, List<Card>> > {

    private ValueState<Card> currentValueState;
    private ListState<Card> historicListState;

    @Override
    public void open(Configuration parameters) throws Exception {
        currentValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("Current State", Card.class));
        historicListState = getRuntimeContext().getListState(new ListStateDescriptor<>("historic state", Card.class));
    }

    @Override
    public void flatMap1(Card currentCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
        Iterable<Card> historicCardList =  historicListState.get();

        //If there is a coincidence
        if (Iterables.size(historicCardList) > 0) {
            out.collect(new Tuple2<>(currentCard, Lists.newArrayList(historicCardList) ));
        } else {
            currentValueState.update(currentCard);
            //Returning null if there are no cards for the Id
            out.collect(new Tuple2<>(currentCard, null));
        }
    }

    @Override
    public void flatMap2(Card historicCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
        historicListState.add(historicCard); //Updates the historicListState
    }
}

Дело в том, что List<Card> вызывает у меня много проблем позже когда я хочу сравнить правила с содержащимися картами, потому что он всегда получает все карты снова, и мне нужен способ пометить карты, которые я уже обработал, в соответствии с моими правилами, что-то вроде этого:

  //I don't like this list because it always gets me all the join coincidences
    for (Card card : historicList) {

        //Comparar cada regla del Broadcast state con el error que contiene el elemento card
        if (rule.getBankDecision().equals(card.getErrors())) {


            //Evaluate some rules
            for (Long stateEventTime : windowState.keys()) {
                if (isStateValueInWindow(stateEventTime, windowStartForEvent, System.currentTimeMillis())) {
                    aggregateValuesInState(stateEventTime, aggregator);
                }

            }
    }

Is Есть ли лучший способ получить объединенные карты в виде потока?

1 Ответ

0 голосов
/ 11 апреля 2020

Надеюсь, я вас правильно понял, если нет, пожалуйста, запишите меня.

  1. private ValueState<Card> currentValueState является избыточным (в этом примере вы только обновляете его и никогда не читаете его значение)
  2. Если я правильно вас понимаю, проблема в том, что вы запускаете вашу RuleSystem на весь исторический список, хотя вы уже проверили некоторые из них. почему бы не удалить из карты исторических списков, которые уже прошли правила?
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...