Я хотел бы создать приложение для структурированной потоковой передачи, целью которого является получение изображений с URL-адреса и создание довольно простой модели ML, которая будет выполнять классификацию на основе содержимого изображения.
У меня есть URL (http://129.102.240.235/extraits/webcam/webcam.jpg), который обновляется каждую единицу времени X новым изображением. Моей целью было бы сначала сохранить эти изображения или импортировать их напрямую, используя объект readStream (есливозможно ли это?). Я знаю, что, начиная с Spark 2.X, мы можем напрямую использовать формат image
для чтения содержимого в Dataframe. Я колебался между разными подходами:
- , используя решение для шины сообщения(как Kafka), который будет производить мой контент для использования в Spark, я подумал, что это будет неплохо, потому что Kafka можно использовать для репликации файлов, чтобы потеря данных была слабее.
- Непосредственно используйте readStreamобъект для чтения изображения (это то, что я пытался сделать, см. ниже)
Цель моего следующего кода Scala - просто показать содержимое изображения, но при тестировании он выдает разные ошибкииспользуя spark-shell
, я прокомментирую ошибки в соответствующей части моего кода ниже.
scala> val url = "http://129.102.240.235/extraits/webcam/webcam.jpg"
url: String = http://129.102.240.235/extraits/webcam/webcam.jpg
scala> spark.sparkContext.addFile(url)
scala> val image_df = spark.read.format("image").load("file://"+SparkFiles.get("webcam.jpg"))
image_df: org.apache.spark.sql.DataFrame = [image: struct<origin: string, height: int ... 4 more fields>]
scala> image_df.select("image.origin").show(false)
19/10/25 13:33:26 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.SparkException: File /tmp/spark-28741963-fd2d-44c2-8a6b-a489fdaae96d/userFiles-95b99fde-a1e2-4da6-9a17-382bfd2292c4/webcam.jpg exists and does not ma
tch contents of http://129.102.240.235/extraits/webcam/webcam.jpg
Я также пытался использовать readStream:
scala> val scheme = " origin STRING, height INT, width INT, nChannels INT, mode INT, data BINARY"
scheme: String = " origin STRING, height INT, width INT, nChannels INT, mode INT, data BINARY"
scala> val image_df = spark.readStream.format("image").schema(scheme).load("file://"+SparkFiles.get("webcam.jpg"))
image_df: org.apache.spark.sql.DataFrame = [origin: string, height: int ... 4 more fields]
scala> val query_show = image_df.collect.foreach(println).writeStream.format("console").start()
<console>:26: error: value writeStream is not a member of Unit
val query_show = image_df.collect.foreach(println).writeStream.format("console").start()
// Based on what I red on StackO question, I suppose that this error might be caused because
// .writeStream should be on the next line, so I tried to put it on 2 lines but..
scala> val query_show = image_df.collect.foreach(println).
| writeStream.format("console").start()
<console>:27: error: value writeStream is not a member of Unit
possible cause: maybe a semicolon is missing before `value writeStream'?
writeStream.format("console").start()
// Also tried without declaring query_show but returns the same error..
// I know that if I make it work I will have to add query_show.awaitTermination()
AnБуду очень признателен за помощь в отладке этого кода или за идею построения моего потокового конвейера!