Я пытаюсь передавать данные с s3 и хранить их локально.Я передам получить имя вновь созданного файла, а затем передам имя файла скрипту Python, который сделает прогноз.Но когда я упоминаю о вызове оператора python, он вызывается, даже если он не получает никаких входных данных, из-за которых он не говорит никаких входных данных.Это потому, что файл еще не был найден в потоке.
Я попробовал какой-то код, и он показывает:
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
ниже приведен код, который я пробовал:
if(!df.isEmpty){
val csvfile = getListOfFiles(output) // returns a file ending with csv
val csvlist = spark.sparkContext.makeRDD(csvfile)
val returned = csvlist.pipe("/home/admin1/IdeaProjects/StockPricePrediction/src/main/python/Predictor.py")
}
Также я хотел вызвать сценарий оболочки после завершения обработки файла python.
//function to get check csv file in dir and sent the name
def getListOfFiles(dir: String): List[String] = {
val file = new File(dir)
file.listFiles.filter(_.isFile)
.filter(_.getName.endsWith(".csv"))
.map(_.getPath).toList
}
val ds = spark.readStream.option("sep", ",").format("csv")
.option("thousands",",")
.schema(schema)
.option("header",true)
.load(path)
val df = ds.select("*")
В нижней строке будет создан новый файл csv с найденными данными
df.writeStream.outputMode("append")
.format("csv")
.option("checkpointLocation", "/home/admin1/IdeaProjects/StockPricePrediction/src/checkpoint")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start(output)
//now it begins to call the python script
val csvfile = getListOfFiles(output)
val csvlist = spark.sparkContext.makeRDD(csvfile)
val returned = csvlist.pipe("/home/admin1/IdeaProjects/StockPricePrediction/src/main/python/Predictor.py")
После выполненияПи-файл готов, он сохраняет прогноз локально, я отправляю прогнозы на s3 с помощью cmd, а затем удаляю файлы, которые были сохранены во время потоковой передачи, чтобы я мог использовать то же самое для следующего файла, который будет передаваться в потоковом режиме
val cmd = new CommandLine("/home/admin1/IdeaProjects/StockPricePrediction/src/main/shellscript.sh")
cmd.addArgument(arg1)
cmd.addArgument("s3a://bucketname/directoryname/")
cmd.addArgument("path for a fle to be deleted")
val exec = new DefaultExecutor()
exec.setWorkingDirectory(FileUtils.getUserDirectory())
exec.execute(cmd)
streaming.awaitTermination()
Любая помощь будет оценена.Спасибо.