Флинк кафка к S3 с повтором - PullRequest
0 голосов
/ 05 марта 2020

Я новичок во Flink, у меня есть требование читать данные из Kafka, обогащать эти данные условно (если запись принадлежит категории X) с помощью некоторого API и писать в S3.

Я создал приложение "Привет, мир!" С вышеупомянутой логикой c, которая работает как шарм.

Но API, который я использую для обогащения, не имеет SLA 100% времени безотказной работы , поэтому мне нужно спроектировать что-то с помощью retry logi c.

Ниже приведены найденные мной опции,

Вариант 1) Сделайте экспоненциальную повторную попытку, пока я не получу ответ от API, но это заблокирует очередь, поэтому мне не нравится

Вариант 2) Используйте еще один topi c (называемый topi c -failure) и опубликуйте sh его в topi c - сбой, если API не работает. Таким образом, он не заблокирует текущую основную очередь. Мне понадобится еще один работник для обработки данных из очереди topi c -failure. Опять же, эта очередь должна использоваться как круговая очередь, если API не работает в течение длительного времени. Например, прочитайте сообщение из очереди topi c -failure, попробуйте выполнить обогащение, если ему не удается добавить pu sh в ту же очередь с именем topi c -failure, и получите следующее сообщение из очереди topi c -failure .

Я предпочитаю вариант 2, но, похоже, не так просто выполнить 1025 *. Существует ли какой-либо стандартный подход Flink для реализации варианта 2?

1 Ответ

1 голос
/ 05 марта 2020

Это довольно распространенная проблема, возникающая при миграции с микросервисов. Правильным решением было бы иметь данные поиска также в Kafka или в некоторой БД, которая могла бы быть интегрирована в то же приложение Flink, что и дополнительный источник.

Если вы не можете сделать это (например, API является внешним или данными не может быть легко сопоставлен с хранилищем данных), оба подхода жизнеспособны и имеют разные преимущества.

1) Позволит вам сохранить порядок входных событий. Если ваше нижестоящее приложение ожидает упорядоченности, вам нужно повторить попытку.

2) Обычным термином является очередь недоставленных сообщений (хотя чаще используется для недействительных записей). Есть два простых способа интегрировать это в Flink: либо иметь отдельный источник, либо использовать шаблон / список топи c с одним источником.

Ваша топология будет выглядеть следующим образом:

Kafka Source      -\             Async IO        /-> Filter good -> S3 sink
                    +-> Union -> with timeout  -+ 
Kafka Source dead -/           (for API call!)   \-> Filter bad  -> Kafka sink dead
...