Есть ли у Flink DataStream api, как mapPartition? - PullRequest
1 голос
/ 03 марта 2020

Я хочу использовать не сериализуемый объект в stream.map(), как это

stream.map { i =>
  val obj = new SomeUnserializableClass()
  obj.doSomething(i)
}

Это очень неэффективно, потому что я создаю много экземпляров SomeUnserializableClass. На самом деле, он может быть создан только один раз в каждом работнике.

В Spark я могу использовать mapPartition для этого. Но в API Flink Stream я не знаю.

1 Ответ

3 голосов
/ 03 марта 2020

Если вы имеете дело с не сериализуемым классом, я рекомендую вам создать RichFunction . В вашем случае RichMapFunction.

Оператор Rich в Flink имеет метод open, который выполняется в диспетчере задач только один раз в качестве инициализатора.

Таким образом, хитрость заключается в том, чтобы сделать ваше поле переходным. и создайте его в своем методе open.

Проверьте приведенный ниже пример:

public class NonSerializableFieldMapFunction extends RichMapFunction {

    transient SomeUnserializableClass someUnserializableClass;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.someUnserializableClass = new SomeUnserializableClass();
    }

    @Override
    public Object map(Object o) throws Exception {
        return someUnserializableClass.doSomething(o);
    }
}

Тогда ваш код будет выглядеть так:

stream.map(new NonSerializableFieldMapFunction())

PD: Я использую java синтаксис, пожалуйста, измените его на scala.

...