Я думаю, вы неправильно поняли API преобразования . Согласно определению, преобразуйте каждую запись входного потока в ноль или более записей в выходном потоке.
Внутри Transformer,состояние получается через интерфейс ProcessorContext.Processor имеет метод init. Метод init () передает экземпляр ProcessorContext, который обеспечивает доступ к метаданным текущей обработанной записи, включая ее исходную тему и раздел Kafka, соответствующее смещение сообщенияи далее такая информация.
Теперь у нас есть доступ к контексту с использованием метода init, затем мы можем вызвать метод forward из контекста, но не внутри метода init.
Этот пример можно увидеть, чтобы понять .Надеюсь, это сработает для вас.