У меня есть текстовый файл HDFS, каждая строка которого является URL-адресом, с которого я могу загрузить изображение.Я хочу загрузить их и сохранить в HDFS с каждым изображением в виде одного файла, и я хочу запустить его на кластере распространения, а не только на узле драйвера.
Вот мой код:
val conf = new SparkConf().setAppName("test")
val sc = new SparkContext(conf)
val hadoopConf = sc.hadoopConfiguration
val rdd = sc.textFile(PIC_URL_PATH, 10) //each line in PIC_URL_PATH is an URL, which I can download picture from.
rdd.foreachPartition(part => {
part.foreach(urlPath => {
val fileSystem = FileSystem.get(hadoopConf) //hadoopConf is not serializable!
val urlLink = new URL(urlPath)
val con = urlLink.openConnection
val is = con.getInputStream
val bs = new Array[Byte](1024)
var len = 0
val fsos = fileSystem.create(new Path(urlPath.substring(urlPath.lastIndexOf("/") + 1)))
while ( (len = is.read(bs)) != -1) fsos.write(bs, 0, len)
fsos.close()
is.close()
})
})
Я читаю текстовый файл как RDD.Затем для каждого раздела я скачал картинку с заданного URL-адреса и попытался использовать FileSystem для сохранения картинки (каждый является потоком) в HDFS.Но класс Configuration не сериализуем.
Есть ли способ решить проблему?