Работа с ядовитыми сообщениями кафки во flink с сохранением порядка на разделах - PullRequest
1 голос
/ 29 мая 2020

Насколько я могу судить, при десериализации объектов с использованием KafkaDeserializationSchema [T] у меня есть 3 варианта: вернуть T, вернуть null (игнорировать запись) или выбросить исключение (закрыть диспетчер задач) [from: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the -схема десериализации] . У меня есть требование прекратить обработку последующих сообщений на topi c, если подозрительное сообщение не проходит десериализацию, но только до тех пор, пока человек не вмешается и не примет решение, игнорировать ли сообщение или заменить его исправленным.

Кому-нибудь приходилось иметь дело с подобным требованием?

Я думал о введении отдельной функции процесса для преобразования массива байтов в T, подключения к нему широковещательного потока и реагирование на команды от человека-оператора во всех случаях этого оператора. Проблема здесь в том, что я не могу придумать способ приостановить чтение из кафки, пока система ждет, пока человек примет решение. Я мог бы генерировать исключения и перезапускать бесконечно, или я мог бы продолжать чтение из topi c и удерживать входящие сообщения в состоянии, но меня беспокоит дополнительное использование ЦП и состояние шаров для вариантов 1 и 2 соответственно.

Есть какие-нибудь мысли? Спасибо!

...