Кафка Потоки очереди писем / тема карантина - PullRequest
0 голосов
/ 30 ноября 2018

Мы создаем приложение kafka-streams как часть большой архитектуры микросервисов.Мы хотим быть устойчивыми к обратным несовместимым изменениям формата и ввели карантинную тему.Мы не смогли найти ничего, предоставленного библиотекой, поэтому мы вроде как свернули свое собственное, просто «вручную», пытаясь десериализовать запись и переслать ее в карантинную тему при сбое.

Easy peasy.

Теперь идет воспроизведение событий на карантине.Это должно быть запущено извне (скажем, вызовом REST) ​​и переместить события в следующую тему, если десериализация прошла успешно.Можем ли мы использовать kafka-streams для выполнения такой операции по требованию?Интуитивно понятно, что все должно быть просто, как builder.stream(quarantined).to(nextTopic).

. Глядя на API процессора, кажется, что остановить процесс невозможно.Прямая блокировка не возможна, так как это повлияет на другие задачи, выполняющиеся в том же StreamThread, и наличие другого приложения KafkaStream кажется излишним.Я хотел бы избежать ручного кодирования цикла потребитель -> производитель, поэтому я также рассматриваю kakka akka-stream, но это тоже звучит немного излишне ...

Есть идеи?

1 Ответ

0 голосов
/ 03 декабря 2018

Если я правильно понимаю ваш вопрос: всякий раз, когда запускается внешний вызов REST, вы хотите запустить отдельное потоковое приложение для чтения из карантинной темы B, пытаясь десериализовать данные в каком-то обновленном формате и, если это удастся, нажать натема «хорошие данные» C, и это потоковое приложение должно автоматически останавливаться, когда оно достигает конца темы B.

В этом случае, и, если у вас нет требований к порядку оформления в последней теме C, вывнутренне может использовать «флаг остановки», который поток вызывающих абонентов KafkaStreams может блокировать и ожидать, в то время как внутренний поток потока KafkaStreams может установить разблокировку потоков вызывающих абонентов, чтобы в конечном итоге вызвать «KafkaStreams.close ()».Например, вы можете использовать функцию пунктуации, которая проверяет, нет ли новых данных после последнего периода пунктуации, указывая, что мы, вероятно, исчерпали все данные из темы B, и в этом случае установите флаг.

Пример можно найти в собственном коде бенчмаркинга Streams: https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java#L657-L673, но обратите внимание, что он основан не на пунктуации, а на логике обработки, проверяя текущее обработанное содержимое данных, поскольку он точно знает, как будет выглядеть «последняя запись»,Но общая идея использования такой отключающей защелки та же.

...