Если вы имеете дело с не сериализуемым классом, я рекомендую вам создать RichFunction . В вашем случае RichMapFunction.
Оператор Rich в Flink имеет метод open
, который выполняется в диспетчере задач только один раз в качестве инициализатора.
Таким образом, хитрость заключается в том, чтобы сделать ваше поле переходным. и создайте его в своем методе open.
Проверьте приведенный ниже пример:
public class NonSerializableFieldMapFunction extends RichMapFunction {
transient SomeUnserializableClass someUnserializableClass;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.someUnserializableClass = new SomeUnserializableClass();
}
@Override
public Object map(Object o) throws Exception {
return someUnserializableClass.doSomething(o);
}
}
Тогда ваш код будет выглядеть так:
stream.map(new NonSerializableFieldMapFunction())
PD: Я использую java синтаксис, пожалуйста, измените его на scala.