Чтение списка входных текстовых файлов, где отдельные имена файлов содержат запятые - PullRequest
0 голосов
/ 23 мая 2018

У меня есть папка на HDFS, в которой по какой-то причине содержатся неполные файлы с запятыми в имени.Например,

hdfs://namespace/mypath/1-1,123
hdfs://namespace/mypath/1-2,124
hdfs://namespace/mypath/1-3,125

Проблема в том, что я хочу читать только некоторые файлы деталей за один раз, чтобы предотвратить перегрузку моего кластера, что означает, что я хочу прочитать 1-1,123 и 1-2,124файлы.

Тем не менее, когда путь подается на искру как:

sc.textFile("hdfs://namespace/mypath/1-1,123,hdfs://namespace/mypath/1-2,124")

Очевидно, что Spark просто токенизирует ",", предполагая, что я ищу 4 отдельных файла.

  • Есть ли способ избежать запятых в пути?
  • Является ли единственная возможность переименовать исходные файлы?

1 Ответ

0 голосов
/ 24 мая 2018

SparkContext.textFile вызывает в некоторой точке FileInputFormat.setInputPaths(Job job, String commaSeparatedPaths), которая, очевидно, просто разделяется на , вход String, представляющий пути через запятую:

Устанавливает заданные пути, разделенные запятыми, в качестве списка входных данных для задания уменьшения карты.

Один из способов обойти это ограничение состоит в использовании альтернативной сигнатуры setInputPaths: FileInputFormat.setInputPaths(Job job, Path... inputPaths) который принимает vararg из Path объектов.Таким образом, нет необходимости разбивать на , и, таким образом, путаница невозможна.

Для этого нам нужно будет создать наш собственный метод textFile, который будет действовать точно так же, как SparkContext.textFile:вызов объекта HadoopRDD, но на этот раз с использованием ввода, предоставленного в виде List из String с вместо String:

package org.apache.spark

import org.apache.spark.rdd.{RDD, HadoopRDD}
import org.apache.spark.util.SerializableConfiguration
import org.apache.hadoop.mapred.{FileInputFormat, JobConf, TextInputFormat}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.fs.Path

object TextFileOverwrite {

  implicit class SparkContextExtension(val sc: SparkContext) extends AnyVal {

    def textFile(
        paths: Seq[String],
        minPartitions: Int = sc.defaultMinPartitions
    ): RDD[String] = {

      val confBroadcast =
        sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))

      val setInputPathsFunc =
        (jobConf: JobConf) =>
          FileInputFormat.setInputPaths(jobConf, paths.map(p => new Path(p)): _*)

      new HadoopRDD(
        sc,
        confBroadcast,
        Some(setInputPathsFunc),
        classOf[TextInputFormat],
        classOf[LongWritable],
        classOf[Text],
        minPartitions
      ).map(pair => pair._2.toString)
    }
  }
}

, который можно использовать следующим образом:

import org.apache.spark.TextFileOverwrite.SparkContextExtension

sc.textFile(Seq("path/hello,world.txt", "path/hello_world.txt"))

По сравнению с SparkContext.textFile единственное отличие в реализации - это вызов FileInputFormat.setInputPaths, который вводит Path s вместо запятой String.

Обратите внимание, что я использую пакет org.apache.spark для хранения этой функции, потому что SerializableConfiguration имеет видимость private[spark] в базе кода искры.

Также обратите внимание на использование implicit class в SparkContext, котороепозволяет нам неявно присоединять этот дополнительный метод textFile непосредственно к объекту SparkContext и, следовательно, вызывать его, используя sc.textFile() вместо того, чтобы передавать sparkContext в качестве параметра метода.

Такжеобратите внимание, что я бы предпочел дать Seq[Path] вместо Seq[String] в качестве ввода этого метда, но Path еще не Serializable в текущей версии hadoop-common, используемой Spark (она станет Serializable начиная с версии 3 hadoop-common).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...