Мой конвейер Flink в настоящее время использует Pojo, который содержит несколько списков и карт (строк), по линиям
public class MyPojo {
private List<String> myList = new ArrayList<>();
private OtherPojo otherPojo = new OtherPojo();
// getters + setters...
}
public class OtherPojo {
private Map<String, String> myMap = new HashMap<>();
// getters + setters...
}
По соображениям производительности я хочу обойти сериализацию Kryo, поэтому я отключил generi c откат с env.getConfig().disableGenericTypes();
, как описано в документации Flink .
Теперь, Flink жалуется на списки:
Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:250)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
...
Какой предпочтительный способ сериализации таких простых списков и карт во Flink ?. Внутренне это в настоящее время ArrayList
и HashMap
, но другие реализации также подойдут. Кажется, во Flink есть класс org.apache.flink.api.common.typeutils.base.ListSerializer
, но я не знаю, как его использовать.