процесс кафки топи c один за другим - PullRequest
0 голосов
/ 17 апреля 2020

Я новичок в kafka.

В нашем проекте есть исходная система, которая собирается публиковать различные типы json сообщений (type1, type2) в разных темах. у нас есть весенние микро-сервисы с слушателями кафки, которые слушают эти темы. нам нужно преобразовать исходное сообщение в другой формат json и отправить его в другой API REST, но здесь есть проблема, что мы должны обработать ВСЕ сообщения type1 перед отправкой сообщений typ2. В случае, если мы отправим type2 раньше, чем type1, мы получим ошибки от REST API, так как он не может обработать type2, не имея type1.

у нас есть параметр в каждом сообщении, который сообщает нам, является ли сообщение type1 или type2

мы думали сделать это, используя эти подходы. я все еще читаю о весенней кафке и даже не знаю, что это возможно, поэтому, пожалуйста, простите, если что-то глупо или невозможно.

  1. сначала прочитайте все сообщения и обработайте type1 перед обработкой type2 (сохраните сообщение в самой кафке топи c). Это возможно ? это похоже на сообщение типа 2, теперь ничего не делать.
  2. читать каждое сообщение и обрабатывать только тип 1, а если это тип 2, оставить его в db для последующей обработки после завершения всех сообщений типа 1. Для этого мне нужно знать, обрабатывается ли все сообщение type1. Как мне это проверить?
  3. Мысль о том, чтобы попросить команду исходного кода обработать сообщения типа 1 в частности topi c и ввести сообщения 2 в другом topi c. С весенней стороны: один список для чтения тем типа 1 и другой один для типа 2 темы. Приостановить прослушивание type2 по умолчанию. сначала обработайте темы типа 1, и если это будет сделано, возобновите прослушиватель типа 2.

или, если вы можете помочь мне в более эффективном подходе, я буду очень признателен.

Пожалуйста, помогите мне и дайте Я знаю, если вам нужны какие-либо дополнительные детали. Заранее спасибо.

1 Ответ

0 голосов
/ 18 апреля 2020

Самый простой и самый эффективный метод - это использование 2 разных тем / слушателей и управление жизненным циклом контейнеров слушателей (например, используя события незанятого контейнера, чтобы узнать, когда все type1 были обработаны).

См. этот вопрос для примера того, что и документация .

1 тоже будет работать; просто перемотайте (отыщите) разделы в исходное положение и повторите обработку с того же места. См. документацию .

...