Flink сериализация java .util.List и java .util.Map - PullRequest
1 голос
/ 18 января 2020

Мой конвейер Flink в настоящее время использует Pojo, который содержит несколько списков и карт (строк), по линиям

public class MyPojo {
    private List<String> myList = new ArrayList<>();
    private OtherPojo otherPojo = new OtherPojo();

    // getters + setters...
}

public class OtherPojo {
    private Map<String, String> myMap = new HashMap<>();

    // getters + setters...
}

По соображениям производительности я хочу обойти сериализацию Kryo, поэтому я отключил generi c откат с env.getConfig().disableGenericTypes();, как описано в документации Flink .

Теперь, Flink жалуется на списки:

Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
    at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
    at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
    at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
    at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:250)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
    ...

Какой предпочтительный способ сериализации таких простых списков и карт во Flink ?. Внутренне это в настоящее время ArrayList и HashMap, но другие реализации также подойдут. Кажется, во Flink есть класс org.apache.flink.api.common.typeutils.base.ListSerializer, но я не знаю, как его использовать.

Ответы [ 2 ]

3 голосов
/ 19 января 2020

Мариус уже прекрасно объяснил причину, хотя я не вижу причины, по которой Flink не поддерживает ваш вариант использования из коробки. Тем не менее, я добавлю решение, которое работает прямо сейчас.

// create type info
final TypeInformation<OtherPojo> otherPojoInfo = Types.POJO(OtherPojo.class, 
    ImmutableMap.of("myMap", Types.MAP(Types.STRING, Types.STRING)));
final TypeInformation<MyPojo> myPojoInfo = Types.POJO(MyPojo.class,
    ImmutableMap.of("myList", Types.LIST(Types.STRING), "otherPojo", otherPojoInfo));

// test it
final MyPojo myPojo = new MyPojo();
myPojo.getMyList().add("test");
myPojo.getOtherPojo().getMyMap().put("ping", "pong");

final TypeSerializer<MyPojo> serializer = myPojoInfo.createSerializer(env.getConfig());
DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(100);
serializer.serialize(myPojo, dataOutputSerializer);

DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(dataOutputSerializer.getSharedBuffer());
final MyPojo clone = serializer.deserialize(dataInputDeserializer);
assert(myPojo.equals(clone));

Обратите внимание, что ужасный шаблон доступа в тестовом коде просто для быстрой и грязной демонстрации.

2 голосов
/ 18 января 2020

Если вы сделаете:

env.getConfig().disableGenericTypes();

Будет сгенерировано исключение при каждом обнаружении типа данных, который будет go через Kryo.

Так что в этом случае вы должны написать свой собственный сериализатор. Который может быть создан с помощью TypeSerializer, просто вызовите typeInfo.createSerializer(config) для объекта TypeInformation.

Для типов generi c вам необходимо «захватить» информацию о типе generi c через TypeHint, в Ваш случай для списка:

TypeInformation<List<Object>> info = TypeInformation.of(new TypeHint<List<Object>>(){});

ListTypeInfo class

Подробнее в здесь .

...