Flink keyBy POJO - PullRequest
       5

Flink keyBy POJO

0 голосов
/ 29 мая 2019

(извиняюсь за неизбежно ужасное форматирование, которое я выкладываю со своего телефона)

Я делаю keyBy, а затем агрегат, но Flink неправильно группирует данные (вместо этого каждое событие попадает в свою собственную группу).

Пример:

Class Purchase {
    String product;
    Integer quantity;
}

Class Filter {
    String product;

    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((product == bull) ? 0 : displayName.hashCode());
}

Class FilteredPurchase {
    Filter filter;
    Purchase purchase;
}

DataStream<FilteredPurchase> =
    ...
    .keyBy(“filter”) //This works
    .keyBy(x -> x.getFilter()). // This doesn’t 
    .sum(“trade.quantity”);

Если мы рассмотрим случай потока, который выглядит следующим образом:

[
    {“filter”: {“product”: null}, “purchase”: {“product”: “apple”, “quantity”: 10},
    {“filter”: {“product”: null}, “purchase”: {“product”: “apple”, “quantity”: 10},
    {“filter”: {“product”: “apple”}, “purchase”: {“product”: “apple”, “quantity”: 10},
    {“filter”: {“product”: “apple”}, “purchase”: {“product”: “apple”, “quantity”: 10},
]

Я ожидал бы, что он будет разделен на 2 раздела (потому что есть два фильтра), общее количество в каждом из которых равно 20. Однако на самом деле я получаю 4 раздела, каждый из которых имеет общее количество 10.

Интересно, что если я использую версию выражения поля, она будет делать то, что я хочу, но я бы хотела оставить все как POJO, поскольку я собираюсь сделать это позже.

Я что-то здесь упускаю? Может ли KeySelector вернуть POJO?

1 Ответ

0 голосов
/ 30 мая 2019

Первый вопрос: почему вы не просто используете продукт (String) в качестве ключа, поскольку это все, что у вас есть в классе Filter. Так

.keyBy(x -> x.getProduct())

Но в любом случае, я думаю, ваш класс ключей (Filter) должен реализовывать метод equals().

...