Объединить несколько потоков объединиться в Flink - PullRequest
0 голосов
/ 07 февраля 2020

У меня есть три разных потока, поступающих из разных источников ( Объекты: Trade,MarketData, WeightAdj, единственное, что у них общего - это «продукт». Вот мои потоки.

Торговый поток: tradeid, продукт, исполнения

Поток MarketData: product, marketData

Поток вычислений: product, factor

Чего я хочу достичь с помощью Flink Я хочу объединить все три потока и получить самое последнее значение Tuple3<Trade,MarketData,WeightAdj >. Это означает, что каждый раз, когда какой-либо из этих потоков генерирует событие, я должен получить самое позднее значение Tuple3<Trade,MarketData,WeightAdj>

Я попытался объединить эти потоки, используя функцию 'connect', за которой следует keyBy, но он не создает объект Enriched, если генерируются события MarketData или WeightAdj.

public static void main(String[] args) throws Exception {
// some code 
  tradeStream.connect(marketStream)
    .keyBy(
            new KeySelector<Trade, String>() {
                @Override
                public String getKey(Trade trd) throws Exception {
                    return trd.product;
                }
            }, new KeySelector<MarketData, String>() {
                @Override
                public String getKey(MarketData marketData)
                        throws Exception {
                    return marketData.product;
                }
            }

    )
    .flatMap(new JoinRichCoFlatMapFunction())
    .connect(weightStream)
    .keyBy(new KeySelector<Tuple2<Trade, MarketData>, String>() {
        @Override
        public String getKey(Tuple2<Trade, MarketData> trd) throws Exception {
            return trd.f0.product;
        }
    }, new KeySelector<WeightAdj, String>() {
        @Override
        public String getKey(WeightAdj wght) throws Exception {
            return wght.product;
        }
    })      
    .flatMap(new TupleWeightJionRichCoFlatMapFunction())
    .print();
}

public static final class JoinRichCoFlatMapFunction extends RichCoFlatMapFunction<Trade, MarketData, Tuple2<Trade, MarketData>>{

    private ValueState<Trade> trades;
    private ValueState<MarketData> marketData;

    @Override
    public void open(Configuration config) {
        trades = getRuntimeContext().getState(new ValueStateDescriptor<>("Trades", Trade.class));
        marketData = getRuntimeContext().getState(new ValueStateDescriptor<>("MarketData", MarketData.class));
    }

    @Override
    public void flatMap1(Trade trd,Collector<Tuple2<Trade, MarketData>> out) throws Exception {

        MarketData mktData = marketData.value();
        if (mktData != null) {
            marketData.clear();
            out.collect(new Tuple2<Trade, MarketData>(trd, mktData));
        } else {
            trades.update(trd);;
        }
    }

    @Override
    public void flatMap2(MarketData mktData,Collector<Tuple2<Trade, MarketData>> out) throws Exception {

        Trade trd = trades.value();
        if (trd != null) {
            trades.clear();
            out.collect(new Tuple2<Trade, MarketData>(trd, mktData));
        } else {
            marketData.update(mktData);;
        }
    }
}

public static final class TupleWeightJionRichCoFlatMapFunction extends RichCoFlatMapFunction<Tuple2<Trade, MarketData>, WeightAdj, Tuple3<Trade, MarketData, WeightAdj>>{

    private ValueState<Tuple2<Trade, MarketData>> tradeMarketState;
    private ValueState<WeightAdj> weightState;

    @Override
    public void open(Configuration config) {

        TypeInformation<Tuple2<Trade, MarketData>> info = TypeInformation.of(new TypeHint<Tuple2<Trade, MarketData>>(){});
        tradeMarketState = getRuntimeContext().getState(new ValueStateDescriptor<>("Trades", info));
        weightState = getRuntimeContext().getState(new ValueStateDescriptor<>("Weights", WeightAdj.class));
    }

    @Override
    public void flatMap1(Tuple2<Trade, MarketData> trdWithMaktData, Collector<Tuple3<Trade, MarketData, WeightAdj>> out)
            throws Exception {

        WeightAdj weigt = weightState.value();
        if (weigt != null) {
            weightState.clear();
            out.collect(new Tuple3<Trade, MarketData, WeightAdj>(trdWithMaktData.f0, trdWithMaktData.f1, weigt));
        } else {
            tradeMarketState.update(trdWithMaktData);;
        }
    }

    @Override
    public void flatMap2(WeightAdj weightData,Collector<Tuple3<Trade, MarketData, WeightAdj>> out) throws Exception {

        Tuple2<Trade, MarketData> trdWithMktData = tradeMarketState.value();
        if (trdWithMktData != null) {
            tradeMarketState.clear();
            out.collect(new Tuple3<Trade, MarketData, WeightAdj>(trdWithMktData.f0, trdWithMktData.f1, weightData));
        } else {
            weightState.update(weightData);;
        }
    }
}

Есть идеи, что я делаю неправильно?

1 Ответ

2 голосов
/ 07 февраля 2020

Если я правильно понимаю ваши цели, есть пара моментов, которые нужно обрабатывать по-разному:

  • Не звоните clear() в любом штате, потому что вам нужно продолжить чтобы запомнить последнее значение, которое вы видели в каждом из трех потоков.
  • Всегда вызывайте out.collect(). Если вызывается flatmap1 или flatmap2, это означает, что что-то было обновлено, поэтому есть что-то новое, о чем следует сообщить.

(похоже, вы имитируете использованную логику c в упражнении RidesAndFares из обучения Flink. В этом упражнении требования различаются: в этом случае существует пара событий Ride и Fare, которые необходимо объединить единовременно. пара Ride / Fare для данного ездового идентификатора, соединение для этого ездового идентификатора выполнено.)

А теперь пара предостережений:

  • Если вы никогда не позвоните clear() и если пространство продукта не ограничено, тогда вы будете постоянно увеличивать количество состояний на неопределенный срок. Если это проблема, вы можете использовать состояние TTL для организации очистки устаревшего состояния.
  • Имейте в виду, что сериализатор Tuple не может обрабатывать нули, если он используется с RocksDB , Я хотел бы переписать каждый из ваших методов плоской карты следующим образом:
public void flatMap1(Trade trd, Collector<Tuple2<Trade, MarketData>> out) throws Exception {

    trades.update(trd);;
    MarketData mktData = marketData.value();
    out.collect(new Tuple2<Trade, MarketData>(trd, mktData));
}

, но при запуске приложения это может привести к Tuple2, где mktData равно нулю. Поэтому было бы неплохо защититься от этого.

Как уже упоминал Арвид, Table / SQL API упрощает такие виды соединений.

...