Как мне сериализовать Tuple3 во Flink (Java)? - PullRequest
0 голосов
/ 06 февраля 2019

У меня есть это MqttConsumer в Java для использования сообщений с помощью Flink.Я получаю сообщения mqtt как String 3|TEMPERATURE|1|1|null|25.0 и разделяю их для извлечения каждого значения.И затем я создаю MqttSensor, состоящий из ключа (Tuple3<Integer, String, Tuple2<Integer, Integer>>) и темы (String) и значения (Double).Когда я вызываю метод ctx.collect(mqttMessage); моего SourceContext<MqttSensor> ctx, я получаю исключение, в котором говорится, что я не могу преобразовать Integer в строку.Тем не менее, я думаю, что проблема заключается в сериализации Tuple3 и Tuple2 (источник: https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer). Спасибо

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
    at org.sense.flink.examples.stream.MultiSensorMultiStationsReadingMqtt.<init>(MultiSensorMultiStationsReadingMqtt.java:48)
    at org.sense.flink.App.main(App.java:130)
Caused by: java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
Serialization trace:
f1 (org.apache.flink.api.java.tuple.Tuple2)
f2 (org.apache.flink.api.java.tuple.Tuple3)
key (org.sense.flink.mqtt.MqttSensor)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$AutomaticWatermarkContext.processAndCollect(StreamSourceContexts.java:176)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
    at org.sense.flink.mqtt.MqttSensorConsumer.run(MqttSensorConsumer.java:75)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)

Как я могу сериализовать Tuple3 и Tuple2? Вотмой код:

public class MqttSensorConsumer extends RichSourceFunction<MqttSensor> {

    private static final long serialVersionUID = -1384636057411239133L;
    final private static String DEFAUL_HOST = "127.0.0.1";
    final private static int DEFAUL_PORT = 1883;

    private String host;
    private int port;
    private String topic;
    private QoS qos;

    public MqttSensorConsumer(String topic) {
        this(DEFAUL_HOST, DEFAUL_PORT, topic, QoS.AT_LEAST_ONCE);
    }

    public MqttSensorConsumer(String host, String topic) {
        this(host, DEFAUL_PORT, topic, QoS.AT_LEAST_ONCE);
    }

    public MqttSensorConsumer(String host, int port, String topic) {
        this(host, port, topic, QoS.AT_LEAST_ONCE);
    }

    public MqttSensorConsumer(String host, int port, String topic, QoS qos) {
        this.host = host;
        this.port = port;
        this.topic = topic;
        this.qos = qos;
    }

    @Override
    public void run(SourceContext<MqttSensor> ctx) throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setHost(host, port);
        BlockingConnection blockingConnection = mqtt.blockingConnection();
        blockingConnection.connect();

        byte[] qoses = blockingConnection.subscribe(new Topic[] { new Topic(topic, qos) });

        while (blockingConnection.isConnected()) {
            Message message = blockingConnection.receive();
            String payload = new String(message.getPayload());
            String[] arr = payload.split("\\|");

            // @formatter:off
            // 2|TEMPERATURE|1|1|null|25.0
            // @formatter:on
            System.out.println("0: " + arr[0]);
            System.out.println("1: " + arr[1]);
            System.out.println("2: " + arr[2]);
            System.out.println("3: " + arr[3]);
            System.out.println("4: " + arr[4]);
            System.out.println("5: " + arr[5]);
            Tuple3<Integer, String, Tuple2<Integer, Integer>> key = Tuple3.of(Integer.parseInt(arr[0]), arr[1],
                    Tuple2.of(Integer.parseInt(arr[2]), Integer.parseInt(arr[3])));
            MqttSensor mqttMessage = new MqttSensor(message.getTopic(), key, Double.valueOf(arr[5]));
            message.ack();
            ctx.collect(mqttMessage);
        }
        blockingConnection.disconnect();
    }

    @Override
    public void cancel() {
        // TODO Auto-generated method stub
    }
}

1 Ответ

0 голосов
/ 07 февраля 2019

Я использовал TypeInformation для решения.

TypeInformation<Tuple3<Integer, String, Tuple2<Integer, Integer>>> key = TypeInformation.of(new TypeHint<Tuple3<Integer, String, Tuple2<Integer, Integer>>>() { });
...