Не может BroadCastState из функции processElement каждый раз - PullRequest
0 голосов
/ 23 апреля 2019

Я пытаюсь использовать BroadCastState в первый раз. Я проверил это на небольшом примере, следуя документации. Я использовал KeyedBroadcastProcessFunction и обновлял состояние карты из функции processBroadcastElement, но когда я пытался получить состояние из функции processElement, чтобы собрать его. несколько раз он выводит требование, а иногда ничего не выводит. в чем причина этого?

Это код, который используется.

DataStream<Tuple4<String,String,Integer,Integer>> similarityTuples = inputStream
                .keyBy(1)
                .connect(usersBroadCasted)
                .process(new KeyedBroadcastProcessFunction<String, Tuple3<String,String,Float>, String, Tuple4<String,String,Integer,Integer>>() {

                    MapStateDescriptor<Integer, String> usersBroadcastState =
                            new MapStateDescriptor<>(
                                    //"patterns", BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
                                    "patterns", BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

                    ListState<String> usersLikedItem;

                    @Override
                    public void processElement(Tuple3<String, String, Float> input, ReadOnlyContext readOnlyContext, Collector<Tuple4<String, String, Integer, Integer>> out) throws Exception {

                        for(String user : usersLikedItem.get()){
                          out.collect(Tuple4.of(user,input.f0,1,0));
                        }
                        usersLikedItem.add(input.f0);

                        for (Map.Entry<Integer, String> entry : readOnlyContext.getBroadcastState(usersBroadcastState).immutableEntries()){                **out.collect(Tuple4.of(input.f0,entry.getValue(),0,10000));**
                        }
                    }

                    @Override
                    public void processBroadcastElement(String s, Context context, Collector<Tuple4<String, String, Integer, Integer>> collector) throws Exception {

                        context.getBroadcastState(usersBroadcastState).put(0,s);

                    }

Я ожидаю этот вывод, и он иногда выводит то, что я ожидал, не меняя ничего в коде (обязательно)

(10,40,0,10000)
(10,20,1,0)
(20,40,0,10000)
(10,30,1,0)
(20,30,1,0)
(30,40,0,10000)
(40,40,0,10000)

но иногда выдает следующее

(10,20,1,0)
(10,30,1,0)
(20,30,1,0)

1 Ответ

0 голосов
/ 24 апреля 2019

Порядок, в котором элементы состояния широковещания и нормальные элементы поступают к оператору, не всегда гарантируется в одном и том же порядке.Это зависит от вышестоящих операторов, которые генерируют элементы.Вот почему иногда вы видите полный вывод, а иногда только вывод без элементов состояния широковещания (все нормальные элементы поступают до элементов состояния широковещания).

Если вы хотите гарантировать, что вы видели все элементыдо определенного момента вам нужно будет дождаться водяных знаков (а также сгенерировать их) и обрабатывать элементы только тогда, когда вы видите соответствующий водяной знак, означающий, что больше элементов с временной отметкой ниже водяного знака не будет.

...