У меня есть это MqttConsumer
в Java для использования сообщений с помощью Flink.Я получаю сообщения mqtt как String 3|TEMPERATURE|1|1|null|25.0
и разделяю их для извлечения каждого значения.И затем я создаю MqttSensor
, состоящий из ключа (Tuple3<Integer, String, Tuple2<Integer, Integer>>
) и темы (String
) и значения (Double
).Когда я вызываю метод ctx.collect(mqttMessage);
моего SourceContext<MqttSensor> ctx
, я получаю исключение, в котором говорится, что я не могу преобразовать Integer в строку.Тем не менее, я думаю, что проблема заключается в сериализации Tuple3
и Tuple2
(источник: https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer). Спасибо
Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.sense.flink.examples.stream.MultiSensorMultiStationsReadingMqtt.<init>(MultiSensorMultiStationsReadingMqtt.java:48)
at org.sense.flink.App.main(App.java:130)
Caused by: java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
Serialization trace:
f1 (org.apache.flink.api.java.tuple.Tuple2)
f2 (org.apache.flink.api.java.tuple.Tuple3)
key (org.sense.flink.mqtt.MqttSensor)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$AutomaticWatermarkContext.processAndCollect(StreamSourceContexts.java:176)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at org.sense.flink.mqtt.MqttSensorConsumer.run(MqttSensorConsumer.java:75)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Как я могу сериализовать Tuple3
и Tuple2
? Вотмой код:
public class MqttSensorConsumer extends RichSourceFunction<MqttSensor> {
private static final long serialVersionUID = -1384636057411239133L;
final private static String DEFAUL_HOST = "127.0.0.1";
final private static int DEFAUL_PORT = 1883;
private String host;
private int port;
private String topic;
private QoS qos;
public MqttSensorConsumer(String topic) {
this(DEFAUL_HOST, DEFAUL_PORT, topic, QoS.AT_LEAST_ONCE);
}
public MqttSensorConsumer(String host, String topic) {
this(host, DEFAUL_PORT, topic, QoS.AT_LEAST_ONCE);
}
public MqttSensorConsumer(String host, int port, String topic) {
this(host, port, topic, QoS.AT_LEAST_ONCE);
}
public MqttSensorConsumer(String host, int port, String topic, QoS qos) {
this.host = host;
this.port = port;
this.topic = topic;
this.qos = qos;
}
@Override
public void run(SourceContext<MqttSensor> ctx) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost(host, port);
BlockingConnection blockingConnection = mqtt.blockingConnection();
blockingConnection.connect();
byte[] qoses = blockingConnection.subscribe(new Topic[] { new Topic(topic, qos) });
while (blockingConnection.isConnected()) {
Message message = blockingConnection.receive();
String payload = new String(message.getPayload());
String[] arr = payload.split("\\|");
// @formatter:off
// 2|TEMPERATURE|1|1|null|25.0
// @formatter:on
System.out.println("0: " + arr[0]);
System.out.println("1: " + arr[1]);
System.out.println("2: " + arr[2]);
System.out.println("3: " + arr[3]);
System.out.println("4: " + arr[4]);
System.out.println("5: " + arr[5]);
Tuple3<Integer, String, Tuple2<Integer, Integer>> key = Tuple3.of(Integer.parseInt(arr[0]), arr[1],
Tuple2.of(Integer.parseInt(arr[2]), Integer.parseInt(arr[3])));
MqttSensor mqttMessage = new MqttSensor(message.getTopic(), key, Double.valueOf(arr[5]));
message.ack();
ctx.collect(mqttMessage);
}
blockingConnection.disconnect();
}
@Override
public void cancel() {
// TODO Auto-generated method stub
}
}