Поведение Spark Dataframe.cache () для изменения источника - PullRequest
0 голосов
/ 17 октября 2018

Мой вариант использования:

  1. Создание кадра данных из таблицы cassandra.
  2. Создание выходного кадра данных путем фильтрации по столбцу и изменение значения этого столбца.
  3. Записать выходной кадр данных в cassandra с набором TTL, чтобы все измененные записи были удалены через короткий промежуток времени (2 с)
  4. Вернуть выходной кадр данных вызывающей стороне, которая через некоторое время записывает его в файловую систему.Я могу только вернуть датафрейм вызывающей стороне, и у меня нет дальнейшего контроля.Кроме того, я не могу увеличить TTL.

Ко времени выполнения шага 4 выходной кадр данных пуст.Это связано с тем, что spark переоценивает фрейм данных в действии, и из-за происхождения происходит повторный запрос кассандры, который теперь не дает записей.
Чтобы избежать этого, я добавил шаг после шага 2:

2a) outputDataframe.cache()

Это гарантирует, что на шаге 5 cassandra не запрашивается, и я также получаю нужные выходные записи в своем файле.У меня есть ниже запросы об этом подходе:

  1. Возможно ли, что в тех случаях, когда spark не находит кэшированные данные (сбой поиска в кеше), он пойдет по родословной и выполнит запрос cassandra?Если да, как можно избежать этого во всех случаях?
  2. Я видел другой способ кэширования: df.rdd.cache().Это отличается от вызова cache() на фрейме данных?

Для справки мой текущий код выглядит следующим образом:

//1
val dfOrig = spark
      .read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("keyspace" -> "myks", "table" -> "mytable", "pushdown" -> "true"))
      .load()
//2
val df = dfOrig.filter("del_flag = 'N'").withColumn("del_flag", lit("Y"))
//3
df.write.format("org.apache.spark.sql.cassandra")
      .options(Map("keyspace" -> "myks", "table" -> "mytable", "spark.cassandra.output.ttl" -> "120"))
      .mode("append")
      .save()
//4
// <After quite some processing, mostly after the TTL, and in the calling code>
df.write.format("csv").save("some.csv") 

1 Ответ

0 голосов
/ 17 октября 2018

Возможно ли, что в тех случаях, когда Spark не находит кэшированные данные (сбой при поиске в кэше), он поднимается по линии и запускает запрос Cassandra?

Да, это возможно.Кэшированные данные могут быть удалены очистителем кэша (в основном в режиме MEMORY_ONLY), могут быть потеряны при выводе из эксплуатации соответствующего узла (сбой, прерывание, освобождение при динамическом выделении).Кроме того, другие параметры, такие как спекулятивное выполнение, могут влиять на поведение кэша.

Наконец, данные могут не полностью кэшироваться с самого начала.

Если да, то как этого избежать ввсе случаи?

Не используйте cache / persist, если вам требуются строгие гарантии согласованности - он не был разработан с учетом вариантов использования, подобных этому.Вместо этого экспортируйте данные в постоянное надежное хранилище (например, HDFS) и читайте их оттуда.

Вы также можете использовать checkpoint с HDFS checkpointDir.

У вас может возникнуть желание использоватьболее надежный режим кэширования, такой как MEMORY_AND_DISK_2 - это может снизить вероятность повторного вычисления данных за счет

df.rdd.cache ().Отличается ли это от вызова cache () на фрейме данных?

Это отличается (основное отличие - стратегия сериализации), но не в том, что касается свойств, представляющих интерес в области действия.этого вопроса.

Важно :

Обратите внимание, что поведение кэширования может быть не самой большой проблемой в вашем коде.Чтение из одной таблицы и добавление ее в одну таблицу может привести к нежелательному или неопределенному поведению всех типов в сложных конвейерах, если только не предприняты дополнительные шаги, чтобы убедиться, что читатель не выбирает вновь записанные записи.

...