Как десериализовать TypeInformation во Flink (Java)? - PullRequest
0 голосов
/ 07 февраля 2019

Я создал MqttConnector для Flink (Java), и он принимает сообщения в классе, который имеет Tuple3 объект.И тогда я хочу создать MapFunction для сопоставления по ключу.Для этого мне нужно десериализовать объект Tuple3 внутри моего MapFunction.Я создал сериализатор, но я не знаю, как извлечь значения.

Вот мой MqttConnector:

public class MqttSensorConsumer extends RichSourceFunction<MqttSensor> {

    private static final long serialVersionUID = -1384636057411239133L;
    final private static String DEFAUL_HOST = "127.0.0.1";
    final private static int DEFAUL_PORT = 1883;

    private String host;
    private int port;
    private String topic;
    private QoS qos;

    public MqttSensorConsumer(String topic) {
        this(DEFAUL_HOST, DEFAUL_PORT, topic, QoS.AT_LEAST_ONCE);
    }

    public MqttSensorConsumer(String host, String topic) {
        this(host, DEFAUL_PORT, topic, QoS.AT_LEAST_ONCE);
    }

    public MqttSensorConsumer(String host, int port, String topic) {
        this(host, port, topic, QoS.AT_LEAST_ONCE);
    }

    public MqttSensorConsumer(String host, int port, String topic, QoS qos) {
        this.host = host;
        this.port = port;
        this.topic = topic;
        this.qos = qos;
    }

    @Override
    public void run(SourceContext<MqttSensor> ctx) throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setHost(host, port);
        BlockingConnection blockingConnection = mqtt.blockingConnection();
        blockingConnection.connect();

        byte[] qoses = blockingConnection.subscribe(new Topic[] { new Topic(topic, qos) });

        while (blockingConnection.isConnected()) {
            Message message = blockingConnection.receive();
            String payload = new String(message.getPayload());
            String[] arr = payload.split("\\|");

            // @formatter:off
            // SensorKey [id=17, sensorType=TEMPERATURE, platform=Platform [id=1, station=Station [id=2, platforms=null]]]|25.13643935135741
            // @formatter:on

            TypeInformation<Tuple3<Integer, String, Tuple2<Integer, Integer>>> key = TypeInformation
                    .of(new TypeHint<Tuple3<Integer, String, Tuple2<Integer, Integer>>>() {
                    });
            MqttSensor mqttMessage = new MqttSensor(message.getTopic(), key, Double.valueOf(arr[5]));
            message.ack();
            ctx.collect(mqttMessage);
        }
        blockingConnection.disconnect();
    }

    @Override
    public void cancel() {
        // TODO Auto-generated method stub
    }
}

Здесь я использую данные из двух каналов, используя FLink:

DataStream<MqttSensor> streamStation01 = env.addSource(new MqttSensorConsumer("topic-station-01"));
DataStream<MqttSensor> streamStation02 = env.addSource(new MqttSensorConsumer("topic-station-02"));

streamStation01.union(streamStation02).map(new TrainStationMapper(env.getConfig()));

А вот MapFunction, в котором мне нужна помощь для извлечения значений:

public static class TrainStationMapper
        implements MapFunction<MqttSensor, Tuple2<Integer, 
               Tuple3<Integer, String, Tuple2<Integer, Integer>>>> {

private static final long serialVersionUID = -5565228597255633611L;
private ExecutionConfig config;

public TrainStationMapper(ExecutionConfig config) {
    this.config = config;
}

@Override
public Tuple2<Integer, Tuple3<Integer, String, Tuple2<Integer, Integer>>> map(MqttSensor value)
        throws Exception {
    Integer stationKey = 0;
    TypeSerializer<Tuple3<Integer, String, Tuple2<Integer, Integer>>> typeSerializer = value.getKey()
            .createSerializer(config);
    // HOW TO EXTRACT THE VALUES FROM THE Tuple3 and Tuple2 ?
    // typeSerializer.deserialize(reuse, source)
    return Tuple2.of(stationKey, Tuple3.of(null, "", Tuple2.of(null, stationKey)));
}
}

Спасибо

...