Spark - ПРЕДУПРЕЖДЕНИЕ HDFSBackedStateStoreProvider: состояние для версии 1 не существует в загруженных картах - PullRequest
1 голос
/ 31 мая 2019

В настоящее время я работаю над заданием Spark Structured Streaming, и кажется, что на каждом интервале пакета я получаю предупреждение:

WARN HDFSBackedStateStoreProvider: The state for version N doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query. - где N увеличивается на каждую партию.

Я вижу это в журналах как в локальном режиме (где контрольные точки отключены), так и при работе на YARN (EMR).

Вопрос: можно ли это безопасно игнорировать?Включение ведения журнала отладки HDFSBackedStateStoreProvider указывает на то, что на чтение снимков и дельта-файлов уходит много времени, поэтому у меня есть некоторые опасения.

Вот мой, казалось бы, минимальный SparkConf

val sparkConf: SparkConf = {
      val conf = new SparkConf()
        .setAppName("Structured Streaming")
        .set("spark.sql.autoBroadcastJoinThreshold", "-1")
        .set("spark.speculation", "false")

      if (App.isLocal)
        conf
          .set("spark.cassandra.output.consistency.level", "LOCAL_ONE")
          .setMaster("local[*]")
      else
        conf
          .set("spark.cassandra.connection.host", PropertyLoader.getProperty("cassandra.contactPoints"))
          .set("spark.cassandra.connection.local_dc", PropertyLoader.getProperty("cassandra.localDC"))
          .set("spark.cassandra.connection.ssl.enabled", "true")
          .set("spark.cassandra.connection.ssl.trustStore.path", PropertyLoader.truststorePath)
          .set("spark.cassandra.connection.ssl.trustStore.password", PropertyLoader.getProperty("cassandra.truststorePassword"))
          .set("spark.cassandra.auth.username", PropertyLoader.getProperty("cassandra.username"))
          .set("spark.cassandra.auth.password", PropertyLoader.getProperty("cassandra.password"))
          .set("spark.executor.logs.rolling.maxRetainedFiles", "20")
          .set("spark.executor.logs.rolling.maxSize", "524288000")
          .set("spark.executor.logs.rolling.strategy", "size")
          .set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
          .set("spark.sql.streaming.metricsEnabled", "true")
          .setJars(Array[String](SparkContext.jarOfClass(getClass).get))
...