Да, это можно сделать с помощью Flink. И CEP и SQL не помогают, так как они требуют, чтобы шаблон был известен во время компиляции.
Для потока событий я предлагаю ввести этот поток в ключ по идентификатору и сохранить атрибут / значения данных в ключевом MapState
, что является разновидностью ключевого состояния, которое Flink знает, как управлять, проверять, восстанавливать и масштабировать по мере необходимости. Это дает нам распределенную карту, отображающую идентификаторы в карты ha sh, содержащие данные для каждого идентификатора.
Для потока управления позвольте мне сначала описать решение для упрощенной версии, в которой запросы управления относятся к form
(id == key) && (attr == value)
Мы можем просто указать этот поток с помощью идентификатора в запросе (например, key ) и подключить этот поток к потоку событий. Мы будем использовать RichCoProcessFunction
для хранения описанного выше MapState, и по мере поступления этих запросов мы можем посмотреть, какие данные у нас есть для ключа , и проверить, map[attr] == value
.
Чтобы обрабатывать более сложные запросы, такие как тот, что в вопросе
(id1 == key1 && attr1 == value1) OR (id2 == key2 && attr2 == value2)
, мы можем сделать что-то более сложное.
Здесь нам нужно будет присвоить уникальный идентификатор каждому контрольному запросу.
Один из подходов - широковещательная передача этих запросов на KeyedBroadcastProcessFunction
, который снова содержит описанное выше MapState. В методе processBroadcastElement
каждый экземпляр может использовать applyToKeyedState
для проверки допустимости компонентов запроса, для которых этот экземпляр сохраняет состояние с ключом (пары атрибут / значение, полученные из поля данных в четном потоке) . Для каждого ключевого компонента запроса, где экземпляр может предоставить запрошенную информацию, он выдает результат ниже по течению.
Затем после KeyedBroadcastProcessFunction
мы вводим поток по идентификатору контрольного запроса и используем KeyedProcessFunction
чтобы собрать вместе все ответы от различных экземпляров KeyedBroadcastProcessFunction
и определить окончательный результат сообщения управления / запроса.
На самом деле здесь нет необходимости использовать широковещательную рассылку, но я нашел эту схему немного проще объяснить. Но вместо этого вы можете направить копии запроса с ключами только в экземпляры RichCoProcessFunction
, содержащего MapState для ключей, используемых в контрольном запросе, а затем выполнить такую же сборку окончательного результата впоследствии.
Возможно, это было нелегко. То, что я предложил, включает составление двух методов, которые я закодировал ранее в примерах: https://github.com/alpinegizmo/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java - это пример, в котором широковещательная передача используется для запуска оценки предикатов запроса по состоянию с ключом, и https://gist.github.com/alpinegizmo/5d5f24397a6db7d8fabc1b12a15eeca6 - это пример, в котором уникальный идентификатор используется для повторной сборки одного ответа после параллельного выполнения нескольких обогащений.