Я предлагаю вам использовать ExecuteSQL.Вы можете установить запрос из атрибута или составить его, используя атрибут.Самый простой способ - создать JSON, а затем проанализировать этот JSON и создать атрибуты.Посмотрите на этот пример. Вот sql, созданный из файла, который вы можете настроить, чтобы создать его из kafka link