API Spark DataSource V2: как получить конфигурацию oop в InputPartitionReader? - PullRequest
0 голосов
/ 26 марта 2020

Я пытаюсь реализовать новый источник данных в Spark с использованием API DataSource V2.

В моем InputPartitionReader мне нужна конфигурация Had oop, поэтому я делаю:

val conf = SparkSession.builder().getOrCreate().sparkContext.hadoopConfiguration

Это приводит к прерыванию задания с помощью:

org.apache.spark.SparkException: A master URL must be set in your configuration

Другие задания Spark работают нормально.

Я что-то упустил или Configuration получен по-другому? Нужно ли как-то сериализовать его и передать его из DefaultSource?

Followup: обходной путь

Я пока не публикую это как ответ, потому что я думаю, что этот подход более взлом или обходной путь, чем реальное решение.

Поскольку Configuration равен Writable, в начале DefaultSource я его сериализую:

val confBytes =
  (for {
    baos <- resource.managed(new ByteArrayOutputStream())
    dos <- resource.managed(new DataOutputStream(baos))
  } yield {
    spark.sparkContext.hadoopConfiguration.write(dos)
    dos.flush()
    baos.flush()
    baos.toByteArray
  }).map(x => x).tried.get

Затем я передать байтовый массив до моих InputPartitionReader с, где я десериализовал его так:

@transient private lazy val conf =
  (for(dis <- resource.managed(new DataInputStream(new ByteArrayInputStream(confBytes)))) yield {
    val conf = new Configuration()
    conf.readFields(dis)
    conf
  }).map(x => x).tried.get

С этим обходным путем я могу затем использовать Had oop 'FileSystem для чтения мои файлы.

resource.managed исходит от scala -арма .

Теперь мой вопрос: Так ли это должно быть сделано? или есть более элегантное решение?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...