Я пытаюсь реализовать новый источник данных в Spark с использованием API DataSource V2.
В моем InputPartitionReader
мне нужна конфигурация Had oop, поэтому я делаю:
val conf = SparkSession.builder().getOrCreate().sparkContext.hadoopConfiguration
Это приводит к прерыванию задания с помощью:
org.apache.spark.SparkException: A master URL must be set in your configuration
Другие задания Spark работают нормально.
Я что-то упустил или Configuration
получен по-другому? Нужно ли как-то сериализовать его и передать его из DefaultSource
?
Followup: обходной путь
Я пока не публикую это как ответ, потому что я думаю, что этот подход более взлом или обходной путь, чем реальное решение.
Поскольку Configuration
равен Writable
, в начале DefaultSource
я его сериализую:
val confBytes =
(for {
baos <- resource.managed(new ByteArrayOutputStream())
dos <- resource.managed(new DataOutputStream(baos))
} yield {
spark.sparkContext.hadoopConfiguration.write(dos)
dos.flush()
baos.flush()
baos.toByteArray
}).map(x => x).tried.get
Затем я передать байтовый массив до моих InputPartitionReader
с, где я десериализовал его так:
@transient private lazy val conf =
(for(dis <- resource.managed(new DataInputStream(new ByteArrayInputStream(confBytes)))) yield {
val conf = new Configuration()
conf.readFields(dis)
conf
}).map(x => x).tried.get
С этим обходным путем я могу затем использовать Had oop 'FileSystem
для чтения мои файлы.
resource.managed
исходит от scala -арма .
Теперь мой вопрос: Так ли это должно быть сделано? или есть более элегантное решение?