Как сменить операторов во время выполнения программы Stream Dataflow? - PullRequest
0 голосов
/ 20 декабря 2018

Интересно, могу ли я сменить операторов на работу, уже представленную Flink.Предположим, у меня есть программа для подсчета слов и фильтр для подсчета только слов, длина которых превышает 3 символа.Я хочу изменить параметры этого фильтра во время выполнения.Мое первое предположение состоит в том, что Flink (и другие механизмы обработки данных Spark, Storm, Apache Edgent) не могут этого сделать, поскольку задание уже было отправлено в env.execute().Кто-нибудь знает какой-либо подход к этому?

Я думаю, этот вопрос ( Развертывание топологии обработки потока во время выполнения? ) связан с тем, что я хочу, но решение все еще не динамично, как я хочу.

Спасибо

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9000)
        .flatMap(new SplitterFlatMap()).keyBy(0)
        .sum(1)
        .filter(word -> word.f1 >= 3);
dataStream.print();
env.execute("WordCountSocketFilterQEP");

Ответы [ 3 ]

0 голосов
/ 20 декабря 2018

С помощью Flink вы можете подключить широковещательный поток к ключевому потоку и транслировать параметры или код, который вы хотите использовать. TaxiQuery - это пример использования Janino с выражениями Java, но вы, вероятно, могли бы динамически загрузить класс.Я также видел, как это делается с Rhino / Javascript, JRuby и т. Д.

0 голосов
/ 20 декабря 2018

чтобы ваши значения parameterStream были отправлены всем операторам, вы должны использовать BroadcastStream.Обратите внимание, что (по состоянию на Flink 1.6?) Это также позволяет вам поддерживать состояние широковещания, где «правила» или настройки конфигурации, которые вы отправляете во все экземпляры вашего DynamicFilterCoFlatMapper, будут автоматически сохраняться как состояния.

0 голосов
/ 20 декабря 2018

Я думаю, во Flink я могу использовать CoFlatMapFunction -> Flink: как обрабатывать изменения конфигурации внешнего приложения в flink .Но в Apache Edgent я не уверен, есть ли способ сделать это .... Вот моя реализация>

package org.sense.flink.examples.stream;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import org.sense.flink.mqtt.FlinkMqttConsumer;
import org.sense.flink.mqtt.MqttMessage;

public class SensorsDynamicFilterMqttEdgentQEP {

    public SensorsDynamicFilterMqttEdgentQEP() throws Exception {

        // Start streaming from fake data source sensors
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // obtain execution environment, run this example in "ingestion time"
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<MqttMessage> temperatureStream = env.addSource(new FlinkMqttConsumer("topic-edgent"));
        DataStream<Tuple2<Double, Double>> parameterStream = env.addSource(new FlinkMqttConsumer("topic-parameter"))
                .map(new ParameterMapper());

        DataStream<MqttMessage> filteredStream = temperatureStream.connect(parameterStream.broadcast())
                .flatMap(new DynamicFilterCoFlatMapper());

        filteredStream.print();

        String executionPlan = env.getExecutionPlan();
        System.out.println("ExecutionPlan ........................ ");
        System.out.println(executionPlan);
        System.out.println("........................ ");

        env.execute("SensorsDynamicFilterMqttEdgentQEP");
    }

    public static class DynamicFilterCoFlatMapper
            implements CoFlatMapFunction<MqttMessage, Tuple2<Double, Double>, MqttMessage> {

        private static final long serialVersionUID = -8634404029870404558L;
        private Tuple2<Double, Double> range = new Tuple2<Double, Double>(-1000.0, 1000.0);

        @Override
        public void flatMap1(MqttMessage value, Collector<MqttMessage> out) throws Exception {

            double payload = Double.parseDouble(value.getPayload());

            if (payload >= this.range.f0 && payload <= this.range.f1) {
                out.collect(value);
            }
        }

        @Override
        public void flatMap2(Tuple2<Double, Double> value, Collector<MqttMessage> out) throws Exception {
            this.range = value;
        }
    }

    public static class ParameterMapper implements MapFunction<MqttMessage, Tuple2<Double, Double>> {

        private static final long serialVersionUID = 7322348505833012711L;

        @Override
        public Tuple2<Double, Double> map(MqttMessage value) throws Exception {
            String[] array = value.getPayload().split(",");
            double min = Double.parseDouble(array[0]);
            double max = Double.parseDouble(array[1]);
            return new Tuple2<Double, Double>(min, max);
        }
    }
}
...