FileStream не читает все существующие файлы из HDFS - PullRequest
0 голосов
/ 08 марта 2019

У меня есть требование для чтения файлов из папки HDFS. Я использую приведенный ниже код для чтения файлов. Он читает файлы, которые мы создали за последние 1 минуту, но не читает существующие файлы старше 1 минуты.

val filterF = new Function[Path, Boolean] {
def apply(x: Path): Boolean = {
  println("looking if "+x+" to be consider or not")
  val flag: Boolean = true
  return flag
}
}

def processStream(inputPath: String) = {

val messages = streamingContext.fileStream [LongWritable, Text, TextInputFormat]( "/user/cust/sample", filterF, false).map{case (x, y) => (y.toString)}
val words = messages.flatMap(_.split(" "))
val wordCount = words.map(rec => (rec, 1)).reduceByKey(_ + _)
wordCount.print()
}

Не могли бы вы помочь.

Спасибо

...