Определение DataFrame - ленивая оценка - PullRequest
1 голос
/ 30 марта 2020

Я новичок в зажигании и изучении этого. Может ли кто-нибудь помочь с приведенным ниже вопросом

Цитата в окончательном искре относительно определения информационного кадра: «В общем, Spark не будет работать только во время выполнения задания, а не во время определения DataFrame - даже если, например, мы указываем на файл этого не существует. Это из-за ленивых вычислений: "

, поэтому я думаю, spark.read.format().load() - это определение фрейма данных. Вдобавок к этому созданному фрейму данных мы применяем преобразования и действия, а загрузка - это чтение API, а не преобразование, если я не ошибаюсь.

Я пытался "загрузить файл, который не существует" при загрузке, и я думаю, что это определение фрейма данных. но я получил ошибку ниже. согласно книге это не должно подвести, верно? Я, безусловно, что-то упустил. Может ли кто-нибудь помочь в этом?

df=spark.read.format('csv')
.option('header', 
'true').option('inferschema', 'true')
.load('/spark_df_data/Spark-The-Definitive-Guide/data/retail-data/by-day/2011-12-19.csv')

Ошибка

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py", line 166, in load
    return self._df(self._jreader.load(path))
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Path does not exist: /spark_df_data/Spark-The-Definitive-Guide/data/retail-data/by-day/2011-12-19.csv;' 

Почему определение фрейма данных ссылается на метаданные oop, когда вычисляется ленивый?

Ответы [ 2 ]

1 голос
/ 30 марта 2020

Пока здесь не определен датафрейм и не создан объект считывателя.

scala> spark.read.format("csv").option("header",true).option("inferschema",true)
res2: org.apache.spark.sql.DataFrameReader = org.apache.spark.sql.DataFrameReader@7aead157

, когда вы на самом деле говорите «загрузить».

res2.load('/spark_df_data/Spark-The-Definitive-Guide/data/retail-data/by-day/2011-12-19.csv') и файл не существует ...... - время выполнения. (это означает, что он должен проверить источник данных и затем загрузить данные из csv)

Чтобы получить фрейм данных, его метаданные проверки имели oop, так как он будет проверьте hdfs, существует ли этот файл.

Это не значит, что вы получаете

org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://203-249-241:8020/spark_df_data/Spark-The-Definitive-Guide/data/retail-data/by-day/2011-12-19.csv

В общем

1) Линия RDD / DataFrame будет создана и не будет Быть выполненным, это время определения. 2) когда будет выполнена загрузка, это будет время выполнения.

См. Приведенный ниже поток, чтобы лучше понять.

enter image description here

Вывод: Любая траформация (время определения на вашем пути) не будет выполняться, пока не будет вызвано действие (время выполнения на вашем пути)

1 голос
/ 30 марта 2020

Искра - это ленивая эволюция. Тем не менее, это не означает, что он не может проверить, существует ли файл при загрузке.

Ленивая эволюция происходит с объектом DataFrame, и для создания объекта DataFrame необходимо сначала проверить, существует ли файл.

Проверьте следующий код .

@scala.annotation.varargs
  def load(paths: String*): DataFrame = {
    if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
      throw new AnalysisException("Hive data source can only be used with tables, you can not " +
        "read files of Hive data source directly.")
    }

    DataSource.lookupDataSourceV2(source, sparkSession.sessionState.conf).map { provider =>
      val catalogManager = sparkSession.sessionState.catalogManager
      val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
        source = provider, conf = sparkSession.sessionState.conf)
      val pathsOption = if (paths.isEmpty) {
        None
      } else {
        val objectMapper = new ObjectMapper()
        Some("paths" -> objectMapper.writeValueAsString(paths.toArray))
      }
...