Я реализую пакетное задание Flink, которое должно объединить два набора данных с помощью преобразования coGroup .Цель состоит в том, чтобы объединить пользователей с заказами по userId (отношение 1-n).Заказы в группе должны быть отсортированы по цене ASC перед входом в CoGroupFunction.Я пытаюсь использовать операцию sortSecondGroup для применения сортировки, но она не дает никакого эффекта, а данные остаются несортированными.
Пример
users (userId, userName) :
{0, john}, {1, jane}, {2, richard}
заказов (userId, price, productName) :
{0, 200, phone}, {0, 5, soap}, {0, 50, book},
{1, 30, shirt}, {1, 15, potato},
{2, 500, laptop},{2, 10, pen}, {2, 300, headphones}
result (userName, [productName1, productName2]) - продукты пользователя сортируются по цене :
{john, [soap, book, phone]},
{jane, [potato, shirt]},
{richard, [pen, headphones, laptop]}
структура работы :
users
.coGroup(orders)
.where(tuple1 -> tuple1.f0)
.equalTo(tuple2 -> tuple2.f0)
//sort orders by price
.sortSecondGroup(1, Order.ASCENDING)
.with(CoGroupJob::joinUserNameWithProductName);
Ожидается: Я ожидал, что после sortSecondGroup набор данных рабочих порядков будет отсортирован и применен к CoGroupJob::joinUserNameWithProductName
в указанном порядке.
Actual: sortSecondGroup не оказывает никакого влияния, и данные остаются несортированными.
Версия Flink: 1.6.0
PS: я могу отсортировать набор данных целых заказов перед coGroup, но наборы данных огромныи работа становится намного медленнее (~ 10 раз).С другой стороны, сортировка данных в CoGroupJob::joinUserNameWithProductName
с Collections.sort () дает OutOfMemoryError из-за большого объема данных в куче.
Java-код для воспроизведения текущей проблемы:
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
public class CoGroupJob {
public static DataSet<Tuple2<String, List<String>>> processOrders(
DataSet<Tuple2<Integer, String>> users,
DataSet<Tuple3<Integer, Integer, String>> orders
) {
return users
.coGroup(orders)
.where(tuple1 -> tuple1.f0)
.equalTo(tuple2 -> tuple2.f0)
//sort orders by price
.sortSecondGroup(1, Order.ASCENDING)
.with(CoGroupJob::joinUserNameWithProductName);
}
private static void joinUserNameWithProductName(Iterable<Tuple2<Integer, String>> users, Iterable<Tuple3<Integer, Integer, String>> orders, Collector<Tuple2<String, List<String>>> out) {
String userName = users.iterator().next().f1;
List<String> orderNames = new ArrayList<>();
orders.iterator().forEachRemaining(order -> orderNames.add(order.f2));
Tuple2<String, List<String>> namesWithOrders = Tuple2.of(userName, orderNames);
out.collect(namesWithOrders);
}
}
И тест:
import com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
class CoGroupJobTest {
@Test
void shouldJoinUsersWithOrdersSortedByPrice(ExecutionEnvironment env) throws Exception {
List<Tuple2<String, List<String>>> userOrders = CoGroupJob.processOrders(
users(env, ImmutableList.of(
Tuple2.of(0, "john"), Tuple2.of(1, "jane"), Tuple2.of(2, "richard")
)),
orders(env, ImmutableList.of(
Tuple3.of(0, 200, "phone"), Tuple3.of(0, 5, "soap"), Tuple3.of(0, 50, "book"),
Tuple3.of(1, 30, "shirt"), Tuple3.of(1, 15, "potato"),
Tuple3.of(2, 500, "laptop"), Tuple3.of(2, 10, "pen"), Tuple3.of(2, 300, "headphones")
))
).collect();
assertThat(userOrders)
.contains(
Tuple2.of("john", ImmutableList.of("soap", "book", "phone")),
Tuple2.of("jane", ImmutableList.of("potato", "shirt")),
Tuple2.of("richard", ImmutableList.of("pen", "headphones", "laptop"))
);
}
private static DataSource<Tuple2<Integer, String>> users(ExecutionEnvironment env, List<Tuple2<Integer, String>> users) {
return env.fromCollection(users, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>(){}));
}
private static DataSource<Tuple3<Integer, Integer, String>> orders(ExecutionEnvironment env, List<Tuple3<Integer, Integer, String>> orders) {
return env.fromCollection(orders, TypeInformation.of(new TypeHint<Tuple3<Integer, Integer, String>>(){}));
}
}