Как вызвать скрипт Python всякий раз, когда встречается новый файл при использовании структурированной потоковой передачи с Scala Spark - PullRequest
0 голосов
/ 03 июля 2019

Я пытаюсь передавать данные с s3 и хранить их локально.Я передам получить имя вновь созданного файла, а затем передам имя файла скрипту Python, который сделает прогноз.Но когда я упоминаю о вызове оператора python, он вызывается, даже если он не получает никаких входных данных, из-за которых он не говорит никаких входных данных.Это потому, что файл еще не был найден в потоке.

Я попробовал какой-то код, и он показывает:

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();

ниже приведен код, который я пробовал:

if(!df.isEmpty){
val csvfile = getListOfFiles(output) // returns a file ending with csv

val csvlist = spark.sparkContext.makeRDD(csvfile)
val returned = csvlist.pipe("/home/admin1/IdeaProjects/StockPricePrediction/src/main/python/Predictor.py")
}

Также я хотел вызвать сценарий оболочки после завершения обработки файла python.

//function to get check csv file in dir and sent the name
def getListOfFiles(dir: String): List[String] = {
val file = new File(dir)
file.listFiles.filter(_.isFile)
  .filter(_.getName.endsWith(".csv"))
  .map(_.getPath).toList
}
  val ds = spark.readStream.option("sep", ",").format("csv")
  .option("thousands",",")
  .schema(schema)
  .option("header",true)
  .load(path)

 val df = ds.select("*")

В нижней строке будет создан новый файл csv с найденными данными

df.writeStream.outputMode("append")
    .format("csv")
    .option("checkpointLocation", "/home/admin1/IdeaProjects/StockPricePrediction/src/checkpoint")
    .trigger(Trigger.ProcessingTime("5 seconds"))
    .start(output)

//now it begins to call the python script
val csvfile = getListOfFiles(output)
val csvlist = spark.sparkContext.makeRDD(csvfile)
val returned = csvlist.pipe("/home/admin1/IdeaProjects/StockPricePrediction/src/main/python/Predictor.py")

После выполненияПи-файл готов, он сохраняет прогноз локально, я отправляю прогнозы на s3 с помощью cmd, а затем удаляю файлы, которые были сохранены во время потоковой передачи, чтобы я мог использовать то же самое для следующего файла, который будет передаваться в потоковом режиме

val cmd = new CommandLine("/home/admin1/IdeaProjects/StockPricePrediction/src/main/shellscript.sh")

cmd.addArgument(arg1)
cmd.addArgument("s3a://bucketname/directoryname/")
cmd.addArgument("path for a fle to be deleted")

val exec = new DefaultExecutor()
exec.setWorkingDirectory(FileUtils.getUserDirectory())
exec.execute(cmd)

streaming.awaitTermination()

Любая помощь будет оценена.Спасибо.

...