Я создал MqttConnector
для Flink (Java), и он принимает сообщения в классе, который имеет Tuple3
объект.И тогда я хочу создать MapFunction
для сопоставления по ключу.Для этого мне нужно десериализовать объект Tuple3
внутри моего MapFunction
.Я создал сериализатор, но я не знаю, как извлечь значения.
Вот мой MqttConnector:
public class MqttSensorConsumer extends RichSourceFunction<MqttSensor> {
private static final long serialVersionUID = -1384636057411239133L;
final private static String DEFAUL_HOST = "127.0.0.1";
final private static int DEFAUL_PORT = 1883;
private String host;
private int port;
private String topic;
private QoS qos;
public MqttSensorConsumer(String topic) {
this(DEFAUL_HOST, DEFAUL_PORT, topic, QoS.AT_LEAST_ONCE);
}
public MqttSensorConsumer(String host, String topic) {
this(host, DEFAUL_PORT, topic, QoS.AT_LEAST_ONCE);
}
public MqttSensorConsumer(String host, int port, String topic) {
this(host, port, topic, QoS.AT_LEAST_ONCE);
}
public MqttSensorConsumer(String host, int port, String topic, QoS qos) {
this.host = host;
this.port = port;
this.topic = topic;
this.qos = qos;
}
@Override
public void run(SourceContext<MqttSensor> ctx) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost(host, port);
BlockingConnection blockingConnection = mqtt.blockingConnection();
blockingConnection.connect();
byte[] qoses = blockingConnection.subscribe(new Topic[] { new Topic(topic, qos) });
while (blockingConnection.isConnected()) {
Message message = blockingConnection.receive();
String payload = new String(message.getPayload());
String[] arr = payload.split("\\|");
// @formatter:off
// SensorKey [id=17, sensorType=TEMPERATURE, platform=Platform [id=1, station=Station [id=2, platforms=null]]]|25.13643935135741
// @formatter:on
TypeInformation<Tuple3<Integer, String, Tuple2<Integer, Integer>>> key = TypeInformation
.of(new TypeHint<Tuple3<Integer, String, Tuple2<Integer, Integer>>>() {
});
MqttSensor mqttMessage = new MqttSensor(message.getTopic(), key, Double.valueOf(arr[5]));
message.ack();
ctx.collect(mqttMessage);
}
blockingConnection.disconnect();
}
@Override
public void cancel() {
// TODO Auto-generated method stub
}
}
Здесь я использую данные из двух каналов, используя FLink:
DataStream<MqttSensor> streamStation01 = env.addSource(new MqttSensorConsumer("topic-station-01"));
DataStream<MqttSensor> streamStation02 = env.addSource(new MqttSensorConsumer("topic-station-02"));
streamStation01.union(streamStation02).map(new TrainStationMapper(env.getConfig()));
А вот MapFunction
, в котором мне нужна помощь для извлечения значений:
public static class TrainStationMapper
implements MapFunction<MqttSensor, Tuple2<Integer,
Tuple3<Integer, String, Tuple2<Integer, Integer>>>> {
private static final long serialVersionUID = -5565228597255633611L;
private ExecutionConfig config;
public TrainStationMapper(ExecutionConfig config) {
this.config = config;
}
@Override
public Tuple2<Integer, Tuple3<Integer, String, Tuple2<Integer, Integer>>> map(MqttSensor value)
throws Exception {
Integer stationKey = 0;
TypeSerializer<Tuple3<Integer, String, Tuple2<Integer, Integer>>> typeSerializer = value.getKey()
.createSerializer(config);
// HOW TO EXTRACT THE VALUES FROM THE Tuple3 and Tuple2 ?
// typeSerializer.deserialize(reuse, source)
return Tuple2.of(stationKey, Tuple3.of(null, "", Tuple2.of(null, stationKey)));
}
}
Спасибо