SparkContext.textFile
вызывает в некоторой точке FileInputFormat.setInputPaths(Job job, String commaSeparatedPaths)
, которая, очевидно, просто разделяется на ,
вход String
, представляющий пути через запятую:
Устанавливает заданные пути, разделенные запятыми, в качестве списка входных данных для задания уменьшения карты.
Один из способов обойти это ограничение состоит в использовании альтернативной сигнатуры setInputPaths
: FileInputFormat.setInputPaths(Job job, Path... inputPaths)
который принимает vararg
из Path
объектов.Таким образом, нет необходимости разбивать на ,
и, таким образом, путаница невозможна.
Для этого нам нужно будет создать наш собственный метод textFile
, который будет действовать точно так же, как SparkContext.textFile
:вызов объекта HadoopRDD
, но на этот раз с использованием ввода, предоставленного в виде List
из String
с вместо String
:
package org.apache.spark
import org.apache.spark.rdd.{RDD, HadoopRDD}
import org.apache.spark.util.SerializableConfiguration
import org.apache.hadoop.mapred.{FileInputFormat, JobConf, TextInputFormat}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.fs.Path
object TextFileOverwrite {
implicit class SparkContextExtension(val sc: SparkContext) extends AnyVal {
def textFile(
paths: Seq[String],
minPartitions: Int = sc.defaultMinPartitions
): RDD[String] = {
val confBroadcast =
sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
val setInputPathsFunc =
(jobConf: JobConf) =>
FileInputFormat.setInputPaths(jobConf, paths.map(p => new Path(p)): _*)
new HadoopRDD(
sc,
confBroadcast,
Some(setInputPathsFunc),
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text],
minPartitions
).map(pair => pair._2.toString)
}
}
}
, который можно использовать следующим образом:
import org.apache.spark.TextFileOverwrite.SparkContextExtension
sc.textFile(Seq("path/hello,world.txt", "path/hello_world.txt"))
По сравнению с SparkContext.textFile
единственное отличие в реализации - это вызов FileInputFormat.setInputPaths
, который вводит Path
s вместо запятой String
.
Обратите внимание, что я использую пакет org.apache.spark
для хранения этой функции, потому что SerializableConfiguration
имеет видимость private[spark]
в базе кода искры.
Также обратите внимание на использование implicit class
в SparkContext
, котороепозволяет нам неявно присоединять этот дополнительный метод textFile
непосредственно к объекту SparkContext
и, следовательно, вызывать его, используя sc.textFile()
вместо того, чтобы передавать sparkContext
в качестве параметра метода.
Такжеобратите внимание, что я бы предпочел дать Seq[Path]
вместо Seq[String]
в качестве ввода этого метда, но Path
еще не Serializable
в текущей версии hadoop-common
, используемой Spark (она станет Serializable
начиная с версии 3 hadoop-common
).