Flink Как DataStream объединяет пользовательский POJO в другой DataStream - PullRequest
0 голосов
/ 07 марта 2020

Я хочу преобразовать DataStream в DataStream с информацией о схеме

input

args [0] DataStream

{"fields":["China","Beijing"]}

args [1] schema

message spark_schema {
  optional binary country (UTF8);
  optional binary city (UTF8);
}

ожидаемый вывод

{"country":"china", "city":"beijing"}

мой код такой

public DataStream<String> convert(DataStream source, MessageType messageType) {

        SingleOutputStreamOperator<String> dataWithSchema = source.map((MapFunction<Row, String>) row -> {
            JSONObject data = new JSONObject();
            this.fields = messageType.getFields().stream().map(Type::getName).collect(Collectors.toList());
            for (int i = 0; i < fields.size(); i++) {
                data.put(fields.get(i), row.getField(i));
            }
            return data.toJSONString();
        });
        return dataWithSchema;
    }

Исключительные ошибки

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.xxxx.ParquetDataSourceReader$$Lambda$64/1174881426@d78795 is not serializable
    at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:180)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1823)
    at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:188)
    at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:590)

Но приведенный ниже код работает нормально

public DataStream<String> convert(DataStream source, MessageType messageType) {
        if (this.fields == null) {
            throw new RuntimeException("The schema of AbstractRowStreamReader is null");
        }

        List<String> field = messageType.getFields().stream().map(Type::getName).collect(Collectors.toList());
        SingleOutputStreamOperator<String> dataWithSchema = source.map((MapFunction<Row, String>) row -> {
            JSONObject data = new JSONObject();
            for (int i = 0; i < field.size(); i++) {
                data.put(field.get(i), row.getField(i));
            }
            return data.toJSONString();
        });
        return dataWithSchema;
    }

Оператор карты Флинка, как объединить внешний комплекс POJO?

1 Ответ

0 голосов
/ 09 марта 2020

Чтобы Flink распределял код по задачам, код должен быть полностью Serializable. В вашем первом примере это не так; во втором это так. В частности, Type::getName сгенерирует лямбду, отличную от Serializable.

Чтобы получить лямбда, равную Serializable, необходимо явно привести ее к сериализуемому интерфейсу (например, Flink MapFunction). или используйте приведение с помощью (Serializable & Function)

Поскольку второе также сохраняет вычисления, было бы лучше в любом случае. Преобразование будет выполнено только один раз во время компиляции задания, в то время как DataStream#map вызывается для каждой записи. Если это не ясно, я рекомендую выполнить его в IDE и поиграть с точками останова.

...