Spark Structured Streaming: импорт файлов изображений для создания простого приложения ML - PullRequest
0 голосов
/ 25 октября 2019

Я хотел бы создать приложение для структурированной потоковой передачи, целью которого является получение изображений с URL-адреса и создание довольно простой модели ML, которая будет выполнять классификацию на основе содержимого изображения.

У меня есть URL (http://129.102.240.235/extraits/webcam/webcam.jpg), который обновляется каждую единицу времени X новым изображением. Моей целью было бы сначала сохранить эти изображения или импортировать их напрямую, используя объект readStream (есливозможно ли это?). Я знаю, что, начиная с Spark 2.X, мы можем напрямую использовать формат image для чтения содержимого в Dataframe. Я колебался между разными подходами:

  • , используя решение для шины сообщения(как Kafka), который будет производить мой контент для использования в Spark, я подумал, что это будет неплохо, потому что Kafka можно использовать для репликации файлов, чтобы потеря данных была слабее.
  • Непосредственно используйте readStreamобъект для чтения изображения (это то, что я пытался сделать, см. ниже)

Цель моего следующего кода Scala - просто показать содержимое изображения, но при тестировании он выдает разные ошибкииспользуя spark-shell, я прокомментирую ошибки в соответствующей части моего кода ниже.

scala> val url = "http://129.102.240.235/extraits/webcam/webcam.jpg"
url: String = http://129.102.240.235/extraits/webcam/webcam.jpg

scala> spark.sparkContext.addFile(url)

scala> val image_df = spark.read.format("image").load("file://"+SparkFiles.get("webcam.jpg"))
image_df: org.apache.spark.sql.DataFrame = [image: struct<origin: string, height: int ... 4 more fields>]

scala> image_df.select("image.origin").show(false)
19/10/25 13:33:26 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.SparkException: File /tmp/spark-28741963-fd2d-44c2-8a6b-a489fdaae96d/userFiles-95b99fde-a1e2-4da6-9a17-382bfd2292c4/webcam.jpg exists and does not ma
tch contents of http://129.102.240.235/extraits/webcam/webcam.jpg

Я также пытался использовать readStream:

scala> val scheme = " origin STRING, height INT, width INT, nChannels INT, mode INT, data BINARY"
scheme: String = " origin STRING, height INT, width INT, nChannels INT, mode INT, data BINARY"

scala> val image_df = spark.readStream.format("image").schema(scheme).load("file://"+SparkFiles.get("webcam.jpg"))
image_df: org.apache.spark.sql.DataFrame = [origin: string, height: int ... 4 more fields]

scala> val query_show = image_df.collect.foreach(println).writeStream.format("console").start()
<console>:26: error: value writeStream is not a member of Unit
       val query_show = image_df.collect.foreach(println).writeStream.format("console").start()

// Based on what I red on StackO question, I suppose that this error might be caused because 
// .writeStream should be on the next line, so I tried to put it on 2 lines but..

scala> val query_show = image_df.collect.foreach(println).
     |   writeStream.format("console").start()
<console>:27: error: value writeStream is not a member of Unit
possible cause: maybe a semicolon is missing before `value writeStream'?
         writeStream.format("console").start()

// Also tried without declaring query_show but returns the same error..
// I know that if I make it work I will have to add query_show.awaitTermination()

AnБуду очень признателен за помощь в отладке этого кода или за идею построения моего потокового конвейера!

1 Ответ

0 голосов
/ 25 октября 2019

Мне удается найти способ show() моего красного кадра данных с использованием формата "изображения". Я сделал это в 2 шага:
1 / запустил скрипт на Python, который сохранит изображение jpg из URL, скрипт:

import urllib.request
import shutil


filename = '~/dididi/bpi-spark/images'
url = "http://129.102.240.235/extraits/webcam/webcam.jpg"

with urllib.request.urlopen(url) as response, open(filename, 'ab') as out_file:  
    shutil.copyfileobj(response, out_file)

2 / Затем, используя spark-shell, я только что выполнилэти 2 строки:

val image_df = spark.read.format("image").option("inferSchema", true).load("bpi-spark/images").select($"image.origin",$"image.height",$"image.width", $"image.mode", $"image.data")

scala> image_df.show()
+--------------------+------+-----+----+--------------------+
|              origin|height|width|mode|                data|
+--------------------+------+-----+----+--------------------+
|file:///home/niki...|   480|  720|  16|[3C 3D 39 3C 3D 3...|
+--------------------+------+-----+----+--------------------+
...