Flink DataSet Значения кортежей не соответствуют ожидаемым - PullRequest
0 голосов
/ 04 мая 2018

У меня есть Dataset<Tuple3<String,String,Double>> values, который имеет следующие данные:

<Vijaya,Chocolate,5>
<Vijaya,Chips,10>
<Rahul,Chocolate,2>
<Rahul,Chips,8>

Я хочу DataSet<Tuple5<String,String,Double,String,Double>> values1 следующим образом:

<Vijaya,Chocolate,5,Chips,10>
<Rahul,Chocolate,2,Chips,8>

Мой код выглядит следующим образом:

DataSet<Tuple5<String, String, Double, String, Double>> values1 = values.fullOuterJoin(values)
    .where(0)
    .equalTo(0)
    .with(
        new JoinFunction<Tuple3<String, String, Double>, Tuple3<String, String, Double>, Tuple5<String, String, Double, String, Double>>() {
            private static final long serialVersionUID = 1L;

            public Tuple5<String, String, Double, String, Double> join(Tuple3<String, String, Double> first, Tuple3<String, String, Double> second) {
                return new Tuple5<String, String, Double, String, Double>(first.f0, first.f1, first.f2, second.f1, second.f2);
            }
        })
    .distinct(1, 3)
    .distinct(1);

В приведенном выше коде я попытался выполнить самостоятельное соединение. Я хочу вывод в этом конкретном формате, но не могу его получить. Как это сделать? Пожалуйста, помогите.

1 Ответ

0 голосов
/ 04 мая 2018

Поскольку вы не хотите, чтобы на выходе повторялся один и тот же элемент, вы можете использовать плоское соединение, в котором вы можете выводить только те записи, у которых значение во 2-й позиции не равно значению в 4-м позиция. Кроме того, если вы хотите только «шоколад» во 2-й позиции, это также можно проверить внутри функции FlatJoin. Ниже приведена ссылка на документацию Флинка о Flat-join.

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/batch/dataset_transformations.html#join-with-flat-join-function

Подход с использованием GroupReduceFunction:

     values
    .groupBy(0)
    .reduceGroup(new GroupReduceFunction<Tuple3<String,String,Double>, Tuple2<String, String>>() {

        @Override
        public void reduce(Iterable<Tuple3<String,String,Double>> in, Collector<Tuple2<String, String>> out) {

            StringBuilder output = new StringBuilder();
            String name = null;

            for (Tuple3<String,String,Double> item : in) {          

                name = item.f0;
                output.append(item.f1+","+item.f2+",");         

            }

            out.collect(new Tuple2<String, String>(name,output.toString())); 

        }
    }); 
...