pyspark textFileStreaming не может обнаружить текстовый файл, пока работает textFile - PullRequest
1 голос
/ 18 июня 2019

Объяснить мой вопрос иначе: Этот вопрос отличается от отмеченного. Во-первых, входной параметр уже является каталогом (это правильно, но отмеченный вопрос неверен). Во-вторых, я скопировал TXT-файл в каталог во время потоковой передачи, чтобы имитировать поступление нового TXT-файла (поэтому вместо файлов, существующих в этом каталоге, создаются новые файлы)

Мой вопрос ниже


У меня есть каталог и текстовый файл /tmp/a.txt, содержимое в файле

aaa
bbb

Я использую pyspark и вручную копирую этот файл в один и тот же каталог непрерывно (во время потоковой работы файлы создаются одновременно)

def count(x):
    if x.isEmpty:
        print("empty")
        return
    print(x.count())

sc = SparkContext()
ssc = StreamingContext(sc, 3)
ssc.textFileStream("/tmp/").foreachRDD(count)

Выходные данные показывают, что СДР пусто

Однако я использую

c = sc.textFile("/tmp/").count()
print(c)

показывает, что c равно 2 (соответствует содержимому txt-файла)

Почему потоковая передача не работает?

Ответы [ 2 ]

0 голосов
/ 20 июня 2019

Я нашел решение в Scala (до сих пор не могу подобрать новые файлы в Python)

Во-первых, sc.textFile и sc.textFileStream принимают один и тот же параметр, который является именем каталога. Так что приведенный выше код является правильным.

Однако, разница в том, что sc.textFile может поднять файлы, если каталог существует (и он должен существовать, иначе InvalidInputException будет поднят), но в потоковом режиме sc.textFileStream (локальная файловая система) , он требует, чтобы каталог не существовал и не создавался потоковой программой, в противном случае новые файлы не могли бы быть подобраны (кажется, что это ошибка, существует только в локальной файловой системе, в HDFS, кажется, работает хорошо в соответствии с опытом других).

Более того, по опыту некоторых других они говорят, что если вы удалите каталог и перезапустите программу, то корзина также должна быть очищена.


Однако в python эта проблема все еще существует, и, пока в каталоге нет файлов, программа scala просто выдаст 0, но программа python выдаст предупреждение о

WARN FileInputDStream:87 - Error finding new files 
java.lang.NullPointerException

Вот мой код на python и scala, способ написания новых файлов одинаков, поэтому я не публикую его здесь

код питона:

if __name__ == "__main__":
    sc = SparkContext()    
    ssc = StreamingContext(sc, 3)
    ssc.textFileStream(path).foreachRDD(lambda x: print(x.count()))
    ssc.start()
    ssc.awaitTermination()

код скалы:

def main(args: Array[String]): Unit = {  
  val sc = new SparkContext()
  val ssc = new StreamingContext(sc, Seconds(3))
  ssc.textFileStream(params.inputPath).foreachRDD { x =>
    print(x.count())
  }
  ssc.start()
  ssc.awaitTermination()
}
0 голосов
/ 19 июня 2019

Вы пытаетесь выбрать новые строки, добавляемые в файл /tmp/a.txt, или вы пытаетесь выбрать новые файлы, добавляемые в каталог tmp?

Если последняя, ​​попробуйте заменить последнюю строку на эту

ssc.textFileStream("/tmp/*").foreachRDD(count)

...