Подключение BroadcastConnectedStream к AsyncIO - PullRequest
0 голосов
/ 19 июня 2020

Мне нужно заставить многофункциональную функцию AsyncIO выполнять вызовы на основе последнего набора правил. Для таких операций, как карта, я смог обработать BroadcastConnectedStream с помощью расширенной функции, следуя этому сообщению в блоге: https://flink.apache.org/2019/06/26/broadcast-state.html

Однако для создания функции AsyncIO требуется DataStream в качестве входа, который BroadcastConnectedStream не (https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org / apache / flink / streaming / api / datastream / BroadcastConnectedStream. html)

Есть ли у кого-нибудь идеи, как я могу обойти это ограничение? Сценарий заключается в том, что я хочу, чтобы функция asyn c отображала sh входящие сообщения в состоянии, когда есть непреходящая ошибка с вызовом asyn c во внешний мир, и возобновляла операцию после «go -ahead "сообщение получено на кафке (хотя я мог бы сделать это с широковещательным потоком)

Ответы [ 2 ]

0 голосов
/ 20 июня 2020

Я думаю, что должна быть возможность поставить BroadcastProcessFunction (не ключевую) перед оператором ввода-вывода asyn c, но вам нужно будет объединить в другой поток (потоки), который вы обрабатываете, поскольку asyn c i / o имеет только один вход. Учитывая, насколько это уродливо, поиск другого способа передачи сигнала «go вперед» может быть предпочтительнее.

Или вы можете посмотреть Stateful Functions , который имеет большую гибкость в этой области.

0 голосов
/ 19 июня 2020

Итак, во-первых, AsyncFunction не поддерживает состояние Keyed, поэтому вам также придется обойти это и реализовать это самостоятельно с помощью CheckpointedFunction.

Как правило, я не думаю, что есть что-нибудь нестандартное, что можно использовать в этом случае. Лучшая идея, которую я могу придумать, если вы хотите использовать broadcast, - это использовать KeyedBroadcastProcessFunction для выдачи результатов по потоку, а затем использовать функцию AsyncIO. Если вы реализуете собственную обработку состояния, вы можете сохранить все неудачные результаты и просто повторить их.

Однако, просто получить все запросы в виде списка и перебирать их в цикле, чтобы повторить попытку, вероятно, не лучшая идея, поскольку это, вероятно, может вызвать снижение производительности (вы устанавливаете, сколько запросов должно выполняться одновременно, но это запрос будет длиться намного дольше, чем другие).

...