Почему моя переменная MapState во Flink не сохраняет предыдущие значения? - PullRequest
0 голосов
/ 01 февраля 2019

Я реализую программу Flink на Java для обработки состояний, используя MapStateDescriptor.Я основываю реализацию на этом источнике .По какой-то причине MapState сохраняет предыдущие значения, и я не могу рассчитать среднее значение, которое я хочу.Во время отладки averageTemp всегда пуст, и внутри я никогда не нахожу никаких ключей.Чего мне не хватает в моей реализации?

import java.util.Map;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.sense.flink.mqtt.MqttTemperature;
import org.sense.flink.mqtt.TemperatureMqttConsumer;

public class SensorsMultipleReadingMqttEdgentQEP {

    public SensorsMultipleReadingMqttEdgentQEP() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<MqttTemperature> temperatureStream01 = env.addSource(new TemperatureMqttConsumer("topic-edgent-01"));
        DataStream<MqttTemperature> temperatureStream02 = env.addSource(new TemperatureMqttConsumer("topic-edgent-02"));
        DataStream<MqttTemperature> temperatureStream03 = env.addSource(new TemperatureMqttConsumer("topic-edgent-03"));
        DataStream<MqttTemperature> temperatureStreams = temperatureStream01.union(temperatureStream02)
                .union(temperatureStream03);

        DataStream<Tuple2<String, Double>> average = temperatureStreams.keyBy(new TemperatureKeySelector())
                .map(new AverageTempMapper());
        average.print();

        env.execute("SensorsMultipleReadingMqttEdgentQEP");
    }

    public static class TemperatureKeySelector implements KeySelector<MqttTemperature, Integer> {

        private static final long serialVersionUID = 5905504239899133953L;

        @Override
        public Integer getKey(MqttTemperature value) throws Exception {
            return value.getId();
        }
    }

    public static class AverageTempMapper extends RichMapFunction<MqttTemperature, Tuple2<String, Double>> {

        private static final long serialVersionUID = -5489672634096634902L;
        private MapState<String, Double> averageTemp;

        @Override
        public void open(Configuration parameters) throws Exception {
            averageTemp = getRuntimeContext()
                    .getMapState(new MapStateDescriptor<>("average-temperature", String.class, Double.class));
        }

        @Override
        public Tuple2<String, Double> map(MqttTemperature value) throws Exception {
            String key = "no-room";
            Double temp = value.getTemp();

            if (value.getId().equals(1) || value.getId().equals(2) || value.getId().equals(3)) {
                key = "room-A";
            } else if (value.getId().equals(4) || value.getId().equals(5) || value.getId().equals(6)) {
                key = "room-B";
            } else if (value.getId().equals(7) || value.getId().equals(8) || value.getId().equals(9)) {
                key = "room-C";
            }
            // NEVER ITERATES ON THE averageTemp
            for (Map.Entry<String, Double> entry: averageTemp.entries()) {
                System.out.println(entry.getKey() + " - " + entry.getValue());
            }

            System.out.println("value: " + value);
            if (averageTemp.contains(key)) { // NEVER CONTAINS A KEY
                System.out.println("yes: " + key);
                temp = (averageTemp.get(key) + value.getTemp()) / 2;
            } else {
                averageTemp.put(key, temp);
            }
            return Tuple2.of(key, temp);
        }
    }
}

** РЕДАКТИРОВАТЬ: ** ОК.Я неправильно понял проблему.Код сохраняет предыдущее состояние на MapState.Я был неправ, потому что я отлаживал код.Но проблема, которую я на самом деле имею, заключается в том, что он запускает более 1 потока и перезаписывает значение моей карты как минимум три раза, прежде чем начинать вычислять среднее значение.

1 Ответ

0 голосов
/ 01 февраля 2019

Состояние в вашей функции карты - на ключ .Поэтому, когда вызывается ваша функция карты и вы получаете состояние карты, оно будет соответствовать любому идентификатору в обрабатываемой записи MqttTemperature.

Учитывая, что вы хотите среднюю температуру для каждой комнаты,Я бы справился с этим следующим образом:

  1. Измените TemperatureKeySelector, чтобы он возвращал room-A, room-B или room-C на основе поля id.
  2. ВAverageTempMapper, имейте две ValueState переменные - одна - сумма температур (удвоенная), а другая - число.Когда вызывается ваш метод map(), если любая из этих двух ValueState переменных равна нулю, инициализируйте его значением 0, а затем суммируйте / увеличивайте.
...