Spark сохранить функциональность использовать MapReduce под капотом - PullRequest
0 голосов
/ 04 марта 2019

Я пытаюсь выяснить, почему saveAsText и более вообще Spark Функция сохранения, кажется, использует MapReduce под капотом.Это исходный код:

RDD.scala

  def saveAsTextFile(path: String): Unit = withScope {
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
  }

PairRDDFunctions.scala

Таким образом, в основном преобразуйте данныеСДР в PairRDD для вызова функции saveAsHadoopFile:

def saveAsHadoopFile(
      path: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: OutputFormat[_, _]],
      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
      codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {

    val hadoopConf = conf
    hadoopConf.setOutputKeyClass(keyClass)
    hadoopConf.setOutputValueClass(valueClass)
    conf.setOutputFormat(outputFormatClass)
    for (c <- codec) {
      hadoopConf.setCompressMapOutput(true)
      hadoopConf.set("mapred.output.compress", "true")
      hadoopConf.setMapOutputCompressorClass(c)
      hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
      hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
    }

    if (conf.getOutputCommitter == null) {
      hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
    }

    val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
    val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
    if (speculationEnabled && outputCommitterClass.contains("Direct")) {
      val warningMessage =
        s"$outputCommitterClass may be an output committer that writes data directly to " +
          "the final location. Because speculation is enabled, this output committer may " +
          "cause data loss (see the case in SPARK-10063). If possible, please use a output " +
          "committer that does not have this behavior (e.g. FileOutputCommitter)."
      logWarning(warningMessage)
    }

    FileOutputFormat.setOutputPath(hadoopConf,
      SparkHadoopWriter.createPathFromString(path, hadoopConf))
    saveAsHadoopDataset(hadoopConf)
  }

Насколько я понимаю, здесь определенно пытаются настроить задание MapReduce, установить outputKey, outputValue и т. Д.

Может ли кто-нибудь объяснить мне:

  • , как происходит операция сохранения Spark
  • В чем основное различие между сохранением Spark и MapReduce

1 Ответ

0 голосов
/ 04 марта 2019

это определенно попытка настроить задание MapReduce, установка outputKey, outputValue и т. Д.

Не совсем.Это настройка конфигурации Hadoop, но это не значит, что она устанавливает задание MapReduce.Hadoop как таковой содержит несколько различных компонентов, и большое их количество не связано с MapReduce.Многие из них, такие как интерфейсы HDFS или компоненты безопасности, используются во многих различных проектах.

В чем главное отличие между Spark save и MapReduce save

Здесь нет ни одного.В целом, когда Spark взаимодействует с файловыми системами, он использует связанные компоненты Hadoop.Однако они не зависят от компонентов MapReduce и их не следует путать с заданиями Hadoop MR.

...