(извиняюсь за неизбежно ужасное форматирование, которое я выкладываю со своего телефона)
Я делаю keyBy, а затем агрегат, но Flink неправильно группирует данные (вместо этого каждое событие попадает в свою собственную группу).
Пример:
Class Purchase {
String product;
Integer quantity;
}
Class Filter {
String product;
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((product == bull) ? 0 : displayName.hashCode());
}
Class FilteredPurchase {
Filter filter;
Purchase purchase;
}
DataStream<FilteredPurchase> =
...
.keyBy(“filter”) //This works
.keyBy(x -> x.getFilter()). // This doesn’t
.sum(“trade.quantity”);
Если мы рассмотрим случай потока, который выглядит следующим образом:
[
{“filter”: {“product”: null}, “purchase”: {“product”: “apple”, “quantity”: 10},
{“filter”: {“product”: null}, “purchase”: {“product”: “apple”, “quantity”: 10},
{“filter”: {“product”: “apple”}, “purchase”: {“product”: “apple”, “quantity”: 10},
{“filter”: {“product”: “apple”}, “purchase”: {“product”: “apple”, “quantity”: 10},
]
Я ожидал бы, что он будет разделен на 2 раздела (потому что есть два фильтра), общее количество в каждом из которых равно 20. Однако на самом деле я получаю 4 раздела, каждый из которых имеет общее количество 10.
Интересно, что если я использую версию выражения поля, она будет делать то, что я хочу, но я бы хотела оставить все как POJO, поскольку я собираюсь сделать это позже.
Я что-то здесь упускаю? Может ли KeySelector вернуть POJO?