Я нашел решение в Scala (до сих пор не могу подобрать новые файлы в Python)
Во-первых, sc.textFile
и sc.textFileStream
принимают один и тот же параметр, который является именем каталога. Так что приведенный выше код является правильным.
Однако, разница в том, что sc.textFile
может поднять файлы, если каталог существует (и он должен существовать, иначе InvalidInputException
будет поднят), но в потоковом режиме sc.textFileStream
(локальная файловая система) , он требует, чтобы каталог не существовал и не создавался потоковой программой, в противном случае новые файлы не могли бы быть подобраны (кажется, что это ошибка, существует только в локальной файловой системе, в HDFS, кажется, работает хорошо в соответствии с опытом других).
Более того, по опыту некоторых других они говорят, что если вы удалите каталог и перезапустите программу, то корзина также должна быть очищена.
Однако в python эта проблема все еще существует, и, пока в каталоге нет файлов, программа scala просто выдаст 0
, но программа python выдаст предупреждение о
WARN FileInputDStream:87 - Error finding new files
java.lang.NullPointerException
Вот мой код на python и scala, способ написания новых файлов одинаков, поэтому я не публикую его здесь
код питона:
if __name__ == "__main__":
sc = SparkContext()
ssc = StreamingContext(sc, 3)
ssc.textFileStream(path).foreachRDD(lambda x: print(x.count()))
ssc.start()
ssc.awaitTermination()
код скалы:
def main(args: Array[String]): Unit = {
val sc = new SparkContext()
val ssc = new StreamingContext(sc, Seconds(3))
ssc.textFileStream(params.inputPath).foreachRDD { x =>
print(x.count())
}
ssc.start()
ssc.awaitTermination()
}