Как установить переменные в предложении «Где» при чтении таблицы cassandra с помощью потоковой передачи с искрой? - PullRequest
0 голосов
/ 29 мая 2019

Я делаю некоторую статистику, используя потоковое искрение и кассандру.При чтении таблиц cassandra с помощью spark-cassandra-connector и создании строки RDD Cassandra для DStreamRDD с помощью ConstantInputDStream переменная «CurrentDate» в предложении where остается в тот же день, когда запускается программа.

Цель состоит в том, чтобыанализировать общую оценку по некоторым измерениям до текущей даты, но теперь код выполняет анализ только до того дня, когда он начнет выполняться.Я запускаю код в 2019-05-25, и данные, вставленные в таблицу после этого времени, не могут быть приняты.

Код, который я использую, выглядит следующим образом:

  class TestJob extends Serializable {

  def test(ssc : StreamingContext) : Unit={

    val readTableRdd = ssc.cassandraTable(Configurations.getInstance().keySpace1,Constants.testTable)
      .select(
        "code",
        "date",
        "time",
        "score"
      ).where("date<= ?",new Utils().getCurrentDate())

    val DStreamRdd = new ConstantInputDStream(ssc,readTableRdd)

    DStreamRdd.foreachRDD{r=>
    //DO SOMETHING
    }
  }
}

      object GetSSC extends Serializable {
      def getSSC() : StreamingContext ={
        val conf = new SparkConf()
          .setMaster(Configurations.getInstance().sparkHost)
          .setAppName(Configurations.getInstance().appName)
          .set("spark.cassandra.connection.host", Configurations.getInstance().casHost)
          .set("spark.cleaner.ttl", "3600")
          .set("spark.default.parallelism","3")
          .set("spark.ui.port","5050")
          .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
        val sc = new SparkContext(conf)
        sc.setLogLevel("WARN")
        @transient lazy val ssc = new StreamingContext(sc,Seconds(30))
        ssc
      }
    }

    object Main {
    val logger : Log = LogFactory.getLog(Main.getClass)
    def main(args : Array[String]) : Unit={
    val ssc = GetSSC.getSSC()
    try{
      new TestJob().test(ssc)
      ssc.start()
      ssc.awaitTermination()
    }catch {
      case e : Exception =>
        logger.error(Main.getClass.getSimpleName+"error : 
    "+e.printStackTrace())
    }
  }
}

Таблица, используемая в этомДемо как:

    CREATE TABLE test.test_table (
       code text PRIMARY KEY, //UUID
       date text, // '20190520'
       time text, // '12:00:00'
       score int); // 90

Любая помощь приветствуется!

1 Ответ

1 голос
/ 29 мая 2019

Как правило, RDD, возвращаемые Spark Cassandra Connector, не являются потоковыми RDD - в Cassandra нет такой функциональности, которая позволяла бы подписываться на ленту изменений и анализировать ее. Вы можете реализовать что-то вроде явного зацикливания и извлечения данных, но это потребует тщательного проектирования таблиц, но сложно что-то сказать, не углубляясь в требования к задержке, объему данных и т. Д.

...