У меня есть потоковое потоковое приложение, которое читает данные из kafka и записывает их в hdfs.Я хочу изменить путь записи hdfs динамически в зависимости от текущей даты, но кажется, что структурированная потоковая передача не работает таким образом.Он просто создает одну папку с датой, когда приложение было запущено, и продолжает запись в ту же папку, даже если дата изменяется.Есть ли способ, которым я могу изменить путь динамически на основе текущей даты?
Ниже показано, как мой писатель выглядит как
val inputFormat = new SimpleDateFormat("yyyy-MM-dd")
val outPath = "maindir/sb_topic/data/loaddate="
val dswWriteStream =dfresult.writeStream
.outputMode(outputMode)
.format(writeformat)
.option("path",outPath+inputFormat.format((new java.util.Date()).getTime())) //hdfs file write path
.option("checkpointLocation", checkpointdir)
.option("maxRecordsPerFile", 999999999)
.trigger(Trigger.ProcessingTime("10 minutes"))