Динамическое изменение пути записи hdf в потоковой передаче с искрой - PullRequest
1 голос
/ 12 апреля 2019

У меня есть потоковое потоковое приложение, которое читает данные из kafka и записывает их в hdfs.Я хочу изменить путь записи hdfs динамически в зависимости от текущей даты, но кажется, что структурированная потоковая передача не работает таким образом.Он просто создает одну папку с датой, когда приложение было запущено, и продолжает запись в ту же папку, даже если дата изменяется.Есть ли способ, которым я могу изменить путь динамически на основе текущей даты?

Ниже показано, как мой писатель выглядит как

 val inputFormat = new SimpleDateFormat("yyyy-MM-dd")
 val outPath = "maindir/sb_topic/data/loaddate="

val dswWriteStream =dfresult.writeStream
    .outputMode(outputMode) 
    .format(writeformat) 
    .option("path",outPath+inputFormat.format((new java.util.Date()).getTime())) //hdfs file write path
    .option("checkpointLocation", checkpointdir) 
    .option("maxRecordsPerFile", 999999999) 
    .trigger(Trigger.ProcessingTime("10 minutes")) 

1 Ответ

0 голосов
/ 16 апреля 2019

Решение: Я решил эту проблему, добавив столбец текущей даты (например, «loaddate») к родительскому фрейму данных «dfresult», а затем разделил записывающий поток по этому столбцу.

dswWriteStream.partitionBy('loaddate')
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...