Пользовательские члены класса AllWindowFunction - PullRequest
0 голосов
/ 17 сентября 2018

У меня есть пользовательский класс AllWindowFunction, в котором есть член класса, отвечающий за вставки в базу данных. Соединение с базой данных является постоянным и открывается во время построения.

Проблема в том, что экземпляр AllWindowFunction, который создает / открывает соединение, не тот, который вызывается при применении события. Обходной путь для этого - статический член, но я хотел бы знать, является ли это единственным обходным решением?

Пример кода:

public class CustomWindowFunction implements AllWindowFunction<String, String, TimeWindow> {

    private static Connection database;

    CustomWindowFunction() {
        database = new Connection();
    }

    @Override
    public void apply(TimeWindow timeWindow, Iterable<String> trades, Collector<String> out) {
        // process data
        database.save(data);
        out.collect(data.toString());
    }
}

Я не смог найти ничего относительно этого механизма, все, что я знаю, это то, что идентификатор объекта из конструктора отличается от идентификатора объекта, вызываемого из apply.

1 Ответ

0 голосов
/ 17 сентября 2018

Это так, потому что каждая функция должна быть сериализована для распределения по узлам кластера. Однако вы можете попробовать использовать RichAllWindowFunction, так называемую «расширенную» версию, где у вас есть метод open(), который будет вызываться в каждом параллельном операторе при его запуске. В этом методе вы можете создать соединение

public class CustomWindowFunction implements RichAllWindowFunction<String, String, TimeWindow> {

    private Connection database;

    @Override
    public void open(Configuration parameters) {
        database = new Connection();
    }

    @Override
    public void apply(TimeWindow timeWindow, Iterable<String> trades, Collector<String> out) {
        // process data
        database.save(data);
        out.collect(data.toString());
    }
}
...