Как кэшировать локальную переменную на уровне процесса в потоковой передаче Flink? - PullRequest
1 голос
/ 11 декабря 2019

Внутри экземпляра задачи Flink Мне нужен доступ к удаленному веб-сервису, чтобы получить некоторые данные при наступлении события, однако я не хочу получать доступ к удаленному веб-сервису каждый раз, когда происходит событие, поэтому мне нужно кэшировать данные в локальной памятии могут быть доступны все задачи процесса, как это сделать? хранить данные в статической закрытой переменной на уровне класса?

Например, в следующем примере, если установить локальную переменную localCache в классе Splitter, она кэшируется на уровне оператора, а не на уровне процесса.

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        ***private object localCache ;***

        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

Ответы [ 2 ]

0 голосов
/ 12 декабря 2019

Масштабируемый подход может использовать оператор источника для фактического выполнения вызова веб-службы, а затем записать результат в поток. Затем вы можете получить доступ к этому потоку в качестве широковещательного потока для своего оператора, в результате чего один объект (результат веб-вызова), отправленный в широковещательный поток, будет отправлен каждому экземпляру принимающего оператора. Это поделится результатом этого единственного веб-вызова на всех машинах и JVM в вашем кластере. Вы также можете сохранить широковещательное состояние и поделиться им с новыми экземплярами своего оператора по мере увеличения кластера.

0 голосов
/ 12 декабря 2019

Точно так же, как вы сказали. Вы бы использовали статическую переменную в RichFlatMapFunction и инициализировали ее в open. open будет вызываться в каждом TaskManager перед подачей в любую запись. Обратите внимание, что для каждого отдельного слота создается экземпляр Splitter, поэтому в большинстве случаев в одном TaskManager есть несколько экземпляров Splitter. Таким образом, вам нужно остерегаться двойного создания.

public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    private transient Object localCache;


    @Override
    public void open(Configuration parameters) throws Exception {
        if (localCache == null)
            localCache = ... ;
    }

    @Override
    public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
        for (String word: sentence.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
        }
    }
}
...