В моем Scala / Spark приложении я создаю DataFrame . Я планирую использовать этот Dataframe несколько раз по всей программе. Вот почему я решил использовать .cache()
метод для этого DataFrame. Как вы можете видеть внутри l oop, я фильтрую DataFrame несколько раз с разными значениями. По какой-то причине метод .count()
возвращает мне всегда один и тот же результат. Фактически он должен возвращать два разных значения счетчика. Также я замечаю странное поведение в Mesos . Такое ощущение, что метод .cache()
не выполняется. После создания DataFrame программа переходит к этой части кода if (!df.head(1).isEmpty)
и выполняет ее очень долго. Я предполагал, что процесс кэширования будет выполняться в течение длительного времени, а другие процессы будут использовать этот кэш и работать быстро. Как вы думаете, в чем проблема?
import org.apache.spark.sql.DataFrame
var df: DataFrame = spark
.read
.option("delimiter", "|")
.csv("/path_to_the_files/")
.filter(col("col5").isin("XXX", "YYY", "ZZZ"))
df.cache()
var array1 = Array("111", "222")
var array2 = Array("333")
var storage = Array(array1, array2)
if (!df.head(1).isEmpty) {
for (item <- storage) {
df.filter(
col("col1").isin(item:_*)
)
println("count: " + df.count())
}
}