FLINK, событие запуска на основе JSON динамики c входных данных (например, данных объекта карты) - PullRequest
1 голос
/ 10 июля 2020

Я хотел бы знать, может ли FLINK удовлетворить мои требования. Я просмотрел много статей, но не уверен, может ли мой случай быть решен или нет

Случай: у меня есть два источника ввода. a) Событие b) Пример данных события ControlSet:

event 1-
{
   "id" :100
   "data" : {
             "name" : "abc"
            }
}

event 2-
{
   "id" :500
   "data" : {
             "date" : "2020-07-10";
             "name" : "event2"
            }
}

, если вы видите, что событие-1 и событие-2 имеют разные атрибуты в «данных». поэтому считайте, что данные представляют собой поле произвольной формы, а имя атрибута может быть таким же / другим.

ControlSet даст нам инструкцию для выполнения триггера. например, условие триггера может быть таким, как

(id = 100 && name = abc) OR (id =500 && date ="2020-07-10")

, пожалуйста, помогите мне, если такой сценарий можно запустить во flink, и что может быть лучшим способом. Я не думаю, что здесь могут помочь patternCEP или SQL, и я не уверен, что событие dataStream может быть как объект JSON и может быть запросом типа JSON path на этом.

1 Ответ

1 голос
/ 11 июля 2020

Да, это можно сделать с помощью Flink. И CEP и SQL не помогают, так как они требуют, чтобы шаблон был известен во время компиляции.

Для потока событий я предлагаю ввести этот поток в ключ по идентификатору и сохранить атрибут / значения данных в ключевом MapState, что является разновидностью ключевого состояния, которое Flink знает, как управлять, проверять, восстанавливать и масштабировать по мере необходимости. Это дает нам распределенную карту, отображающую идентификаторы в карты ha sh, содержащие данные для каждого идентификатора.

Для потока управления позвольте мне сначала описать решение для упрощенной версии, в которой запросы управления относятся к form

(id == key) && (attr == value)

Мы можем просто указать этот поток с помощью идентификатора в запросе (например, key ) и подключить этот поток к потоку событий. Мы будем использовать RichCoProcessFunction для хранения описанного выше MapState, и по мере поступления этих запросов мы можем посмотреть, какие данные у нас есть для ключа , и проверить, map[attr] == value.

Чтобы обрабатывать более сложные запросы, такие как тот, что в вопросе

(id1 == key1 && attr1 == value1) OR (id2 == key2 && attr2 == value2)

, мы можем сделать что-то более сложное.

Здесь нам нужно будет присвоить уникальный идентификатор каждому контрольному запросу.

Один из подходов - широковещательная передача этих запросов на KeyedBroadcastProcessFunction, который снова содержит описанное выше MapState. В методе processBroadcastElement каждый экземпляр может использовать applyToKeyedState для проверки допустимости компонентов запроса, для которых этот экземпляр сохраняет состояние с ключом (пары атрибут / значение, полученные из поля данных в четном потоке) . Для каждого ключевого компонента запроса, где экземпляр может предоставить запрошенную информацию, он выдает результат ниже по течению.

Затем после KeyedBroadcastProcessFunction мы вводим поток по идентификатору контрольного запроса и используем KeyedProcessFunction чтобы собрать вместе все ответы от различных экземпляров KeyedBroadcastProcessFunction и определить окончательный результат сообщения управления / запроса.

На самом деле здесь нет необходимости использовать широковещательную рассылку, но я нашел эту схему немного проще объяснить. Но вместо этого вы можете направить копии запроса с ключами только в экземпляры RichCoProcessFunction, содержащего MapState для ключей, используемых в контрольном запросе, а затем выполнить такую ​​же сборку окончательного результата впоследствии.

Возможно, это было нелегко. То, что я предложил, включает составление двух методов, которые я закодировал ранее в примерах: https://github.com/alpinegizmo/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java - это пример, в котором широковещательная передача используется для запуска оценки предикатов запроса по состоянию с ключом, и https://gist.github.com/alpinegizmo/5d5f24397a6db7d8fabc1b12a15eeca6 - это пример, в котором уникальный идентификатор используется для повторной сборки одного ответа после параллельного выполнения нескольких обогащений.

...