Я могу разработать конвейер, который читает из kafka, выполняет некоторые преобразования и записывает вывод в приемник kafka, а также в приемник parque. Я хотел бы добавить эффективную регистрацию для регистрации промежуточных результатов преобразования, как в обычном потоковом приложении.
Один из вариантов, который я вижу, это войти в систему queryExecutionstreams
через
df.queryExecution.analyzed.numberedTreeString
или
logger.info("Query progress"+ query.lastProgress)
logger.info("Query status"+ query.status)
Но, похоже, нет способа увидеть специфичные для бизнеса сообщения, на которых работает поток.
Есть ли способ, как я могу добавить больше информации о журнале, например данные, которые он обрабатывает?