Групповая сортировка в Flink CoGroupOperator не работает должным образом - PullRequest
0 голосов
/ 21 января 2019

Я реализую пакетное задание Flink, которое должно объединить два набора данных с помощью преобразования coGroup .Цель состоит в том, чтобы объединить пользователей с заказами по userId (отношение 1-n).Заказы в группе должны быть отсортированы по цене ASC перед входом в CoGroupFunction.Я пытаюсь использовать операцию sortSecondGroup для применения сортировки, но она не дает никакого эффекта, а данные остаются несортированными.

Пример

users (userId, userName) :

{0, john}, {1, jane}, {2, richard}

заказов (userId, price, productName) :

{0, 200, phone}, {0, 5, soap}, {0, 50, book},
{1, 30, shirt},  {1, 15, potato},
{2, 500, laptop},{2, 10, pen}, {2, 300, headphones}

result (userName, [productName1, productName2]) - продукты пользователя сортируются по цене :

{john, [soap, book, phone]},
{jane, [potato, shirt]},
{richard, [pen, headphones, laptop]}

структура работы :

users
   .coGroup(orders)
   .where(tuple1 -> tuple1.f0)
   .equalTo(tuple2 -> tuple2.f0)
    //sort orders by price
   .sortSecondGroup(1, Order.ASCENDING)
   .with(CoGroupJob::joinUserNameWithProductName);

Ожидается: Я ожидал, что после sortSecondGroup набор данных рабочих порядков будет отсортирован и применен к CoGroupJob::joinUserNameWithProductName в указанном порядке.

Actual: sortSecondGroup не оказывает никакого влияния, и данные остаются несортированными.

Версия Flink: 1.6.0

PS: я могу отсортировать набор данных целых заказов перед coGroup, но наборы данных огромныи работа становится намного медленнее (~ 10 раз).С другой стороны, сортировка данных в CoGroupJob::joinUserNameWithProductName с Collections.sort () дает OutOfMemoryError из-за большого объема данных в куче.

Java-код для воспроизведения текущей проблемы:

import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class CoGroupJob {

    public static DataSet<Tuple2<String, List<String>>> processOrders(
            DataSet<Tuple2<Integer, String>> users,
            DataSet<Tuple3<Integer, Integer, String>> orders
    ) {
        return users
                .coGroup(orders)
                .where(tuple1 -> tuple1.f0)
                .equalTo(tuple2 -> tuple2.f0)
                //sort orders by price
                .sortSecondGroup(1, Order.ASCENDING)
                .with(CoGroupJob::joinUserNameWithProductName);
    }

    private static void joinUserNameWithProductName(Iterable<Tuple2<Integer, String>> users, Iterable<Tuple3<Integer, Integer, String>> orders, Collector<Tuple2<String, List<String>>> out) {

        String userName = users.iterator().next().f1;

        List<String> orderNames = new ArrayList<>();
        orders.iterator().forEachRemaining(order -> orderNames.add(order.f2));

        Tuple2<String, List<String>> namesWithOrders = Tuple2.of(userName, orderNames);
        out.collect(namesWithOrders);
    }
}


И тест:


import com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;

import org.junit.jupiter.api.Test;

import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;


class CoGroupJobTest {

    @Test
    void shouldJoinUsersWithOrdersSortedByPrice(ExecutionEnvironment env) throws Exception {

        List<Tuple2<String, List<String>>> userOrders = CoGroupJob.processOrders(
                users(env, ImmutableList.of(
                        Tuple2.of(0, "john"), Tuple2.of(1, "jane"), Tuple2.of(2, "richard")
                )),

                orders(env, ImmutableList.of(
                        Tuple3.of(0, 200, "phone"),  Tuple3.of(0, 5,  "soap"),  Tuple3.of(0, 50,  "book"),
                        Tuple3.of(1, 30,  "shirt"),  Tuple3.of(1, 15, "potato"),
                        Tuple3.of(2, 500, "laptop"), Tuple3.of(2, 10, "pen"),   Tuple3.of(2, 300, "headphones")
                ))
        ).collect();

        assertThat(userOrders)
                .contains(
                        Tuple2.of("john",    ImmutableList.of("soap", "book", "phone")),
                        Tuple2.of("jane",    ImmutableList.of("potato", "shirt")),
                        Tuple2.of("richard", ImmutableList.of("pen", "headphones", "laptop"))
                );
    }

    private static DataSource<Tuple2<Integer, String>> users(ExecutionEnvironment env, List<Tuple2<Integer, String>> users) {
        return env.fromCollection(users, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>(){}));
    }

    private static DataSource<Tuple3<Integer, Integer, String>> orders(ExecutionEnvironment env, List<Tuple3<Integer, Integer, String>> orders) {
        return env.fromCollection(orders, TypeInformation.of(new TypeHint<Tuple3<Integer, Integer, String>>(){}));
    }
}
...