Я создал преобразователь, который выполняет некоторые преобразования на ConnectRecord
для использования в Kafka Connect.
Трансформатор работает нормально, но я хочу, чтобы приемник не использовал приемник сообщений наконкретная тема при обнаружении определенного критерия (ошибки).
На мгновение код выдает исключение, останавливая работника.Несмотря на свою эффективность, это потенциально также влияет на другие темы и выглядит как последнее средство.
Есть ли способ остановить / приостановить потребление темы из кода Kafka Connect Transformation
хорошим способом?
class ApplySchemaTransformation[T <: ConnectRecord[T]]
extends Transformation[T]
with ContentTypeHandler[T] {
override def apply(record: T): T = {
if ([some criteria]) {
[ok code]
} else {
[stop consumer]
}
}
override def config(): ConfigDef = ...
override def close(): Unit = {}
override def configure(configs: java.util.Map[String, _]): Unit = ...
}