Как я понимаю, что в Spark используется кеширование? - PullRequest
0 голосов
/ 27 января 2020

В моем Scala / Spark приложении я создаю DataFrame . Я планирую использовать этот Dataframe несколько раз по всей программе. Вот почему я решил использовать .cache() метод для этого DataFrame. Как вы можете видеть внутри l oop, я фильтрую DataFrame несколько раз с разными значениями. По какой-то причине метод .count() возвращает мне всегда один и тот же результат. Фактически он должен возвращать два разных значения счетчика. Также я замечаю странное поведение в Mesos . Такое ощущение, что метод .cache() не выполняется. После создания DataFrame программа переходит к этой части кода if (!df.head(1).isEmpty) и выполняет ее очень долго. Я предполагал, что процесс кэширования будет выполняться в течение длительного времени, а другие процессы будут использовать этот кэш и работать быстро. Как вы думаете, в чем проблема?

import org.apache.spark.sql.DataFrame

var df: DataFrame = spark
    .read
    .option("delimiter", "|")
    .csv("/path_to_the_files/")
    .filter(col("col5").isin("XXX", "YYY", "ZZZ"))

df.cache()

var array1 = Array("111", "222")

var array2 = Array("333")

var storage = Array(array1, array2)

if (!df.head(1).isEmpty) {
    for (item <- storage) {
        df.filter(
            col("col1").isin(item:_*)
        )

        println("count: " + df.count())
    }
}

1 Ответ

2 голосов
/ 27 января 2020

Фактически он должен возвращать два разных значения счетчика.

Почему? Вы звоните на тот же df. Возможно, вы имели в виду что-то вроде

val df1 = df.filter(...)
println("count: " + df1.count())

Я предполагал, что процесс кэширования будет выполняться в течение длительного времени, а другие процессы будут использовать этот кэш и работать быстро.

Да, но только когда выполняется первое действие , которое зависит от этого кадра данных, и head является этим действием. Таким образом, вы должны ожидать ровно

программа переходит к этой части кода if (!df.head(1).isEmpty) и выполняет ее очень долго

Без кеширования вы также получите одно и то же время для вызовов df.count(), если только Spark не обнаружит их и не включит кэширование самостоятельно.

...