Что означает, что «широковещательное состояние» разблокирует реализацию функции «динамических шаблонов» для библиотеки CEP Флинка? - PullRequest
0 голосов
/ 26 мая 2018

Из объявления о выпуске Flink 1.5 мы знаем, что Flink теперь поддерживает «состояние широковещания», и было описано, что «состояние широковещания разблокирует реализацию функции« динамические шаблоны »для библиотеки CEP Flink».

Означает ли это, что в настоящее время мы можем использовать «широковещательное состояние» для реализации «динамических шаблонов» без Flink CEP?Также я понятия не имею, в чем разница при реализации «динамических шаблонов» для Flink CEP с или без состояния широковещания?Я был бы признателен, если кто-то может привести пример с кодом, чтобы объяснить разницу.

=============

Обновление для тестирования данных трансляции-поток оператором broadcast () с ключевым потоком данных

После тестирования в Flink 1.4.2 я обнаружил, что поток широковещательных данных (по старому оператору broadcast ()) может соединяться с потоком ключевых данных, ниже приведен тесткод, и мы нашли все события потока управления, передаваемые всем экземплярам оператора.Таким образом, кажется, что старая широковещательная рассылка () может выполнять те же функции, что и новое «широковещательное состояние».

public static void ConnectBroadToKeyedStream() throws Exception {
    StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);

    List<Tuple1<String>>
            controlData = new ArrayList<Tuple1<String>>();
    controlData.add(new Tuple1<String>("DROP"));
    controlData.add(new Tuple1<String>("IGNORE"));
    DataStream<Tuple1<String>> control = env.fromCollection(controlData);//.keyBy(0);

    List<Tuple1<String>>
            dataStreamData = new ArrayList<Tuple1<String>>();
    dataStreamData.add(new Tuple1<String>("data"));
    dataStreamData.add(new Tuple1<String>("DROP"));
    dataStreamData.add(new Tuple1<String>("artisans"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));

    // DataStream<String> data2 = env.fromElements("data", "DROP", "artisans", "IGNORE");
    DataStream<Tuple1<String>> keyedDataStream = env.fromCollection(dataStreamData).keyBy(0);

    DataStream<String> result = control
            .broadcast()
            .connect(keyedDataStream)
            .flatMap(new MyCoFlatMap());
    result.print();
    env.execute();
}

private static final class MyCoFlatMap
        implements CoFlatMapFunction<Tuple1<String>, Tuple1<String>, String> {
    HashSet blacklist = new HashSet();

    @Override
    public void flatMap1(Tuple1<String> control_value, Collector<String> out) {
        blacklist.add(control_value);
        out.collect("listed " + control_value);
    }

    @Override
    public void flatMap2(Tuple1<String> data_value, Collector<String> out) {

        if (blacklist.contains(data_value)) {
            out.collect("skipped " + data_value);
        } else {
            out.collect("passed " + data_value);
        }
    }
}

Ниже приведен результат теста.

1> passed (data)
1> passed (DROP)
3> passed (artisans)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> listed (DROP)
3> listed (IGNORE)
1> listed (DROP)
1> listed (IGNORE)
2> listed (DROP)
2> listed (IGNORE)

https://data-artisans.com/blog/apache-flink-1-5-0-release-announcement

Ответы [ 2 ]

0 голосов
/ 06 июля 2018

Вот пример кода, который реализует как оригинальный метод широковещания flink без аргументов, так и недавно введенное состояние широковещания при flink 1.5.0 .https://gist.github.com/syhily/932e0d1e0f12b3e951236d6c36e5a7ed

Насколько я узнал, состояние широковещания может быть реализовано без flink cep, как в коде, показанном выше.

Оригинальный метод DataStream broadcastсоздаст DataStream вместо BroadcastConnectedStream.Это будет оригинальная coGroup схема проектирования.Мы могли бы использовать больше функции преобразования потока, определенной в ConnectedStreams, после соединения потока метрик с широковещательным потоком правил.Например, функция keyBy позволяет создать транслируемый поток и подключенный поток, которые имеют одинаковый ключ , будут process ed и прикреплены к тому же параллельному CoProcessFunction.Таким образом, CoProcessFunction может иметь собственное локальное хранилище.Функция может иметь собственную структуру данных в своем поле, отличную от состояния карты, доступ к которому осуществляется из ReadOnlyContext.

Состояние широковещания может быть реализовано методом broadcast с набором MapStateDescriptor, это означает, чтотранслируемый поток может быть связан с другим потоком много раз.Различные подключенные BroadcastConnectedStream могут совместно использовать свое собственное состояние широковещания с уникальной функцией MapStateDescriptor in process.

Я думал, что это будут ключевые различия между широковещательной передачей с аргументами on и широковещательным состоянием.

0 голосов
/ 26 мая 2018

Без состояния широковещания два потока данных Flink не могут обрабатываться вместе с сохранением состояния, если они не имеют одинаковый ключ.Широковещательный поток может быть подключен к потоку с ключами, но если вы попытаетесь использовать состояние с ключами в RichCoFlatMap, например, это не удастся.

Чаще всего желательно иметь возможность иметь один поток с динамическими «правилами», которые должны применяться к каждому событию в другом потоке, независимо от ключа.Должен существовать новый тип управляемого состояния Flink, в котором эти правила могут быть сохранены.* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *} * Теперь, когда эта функция включена, теперь можно работать с поддержкой динамических шаблонов в CEP.

...