Spark Structured Streaming от kafka для сохранения данных в Cassandra в распределенном режиме - PullRequest
0 голосов
/ 04 февраля 2019

Я пытаюсь создать структурированный поток из Кафки в Spark, который представляет собой строку json.Теперь нужно проанализировать json в определенном столбце, а затем сохранить кадр данных в таблицу cassandra с оптимальной скоростью.Используя Spark 2.4 и cassandra 2.11 (Apache), а не DSE.

Я попытался создать Direct Stream, который дает DStream класса case, который я сохранял в Cassandra, используя foreachRDD на DStream, но это зависало после каждых 6-7 дней.Таким образом, была попытка выполнить потоковую передачу, которая напрямую выдает фрейм данных и может быть сохранена в Cassandra.

val conf = new SparkConf()
          .setMaster("local[3]")
      .setAppName("Fleet Live Data")
      .set("spark.cassandra.connection.host", "ip")
      .set("spark.cassandra.connection.keep_alive_ms", "20000")
      .set("spark.cassandra.auth.username", "user")
      .set("spark.cassandra.auth.password", "pass")
      .set("spark.streaming.stopGracefullyOnShutdown", "true")
      .set("spark.executor.memory", "2g")
      .set("spark.driver.memory", "2g")
      .set("spark.submit.deployMode", "cluster")
      .set("spark.executor.instances", "4")
      .set("spark.executor.cores", "2")
      .set("spark.cores.max", "9")
      .set("spark.driver.cores", "9")
      .set("spark.speculation", "true")
      .set("spark.locality.wait", "2s")

val spark = SparkSession
  .builder
  .appName("Fleet Live Data")
  .config(conf)
  .getOrCreate()
println("Spark Session Config Done")

val sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc, Seconds(10))
val sqlContext = new SQLContext(sc)
 val topics = Map("livefleet" -> 1)
import spark.implicits._
implicit val formats = DefaultFormats

 val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerIP:port")
  .option("subscribe", "livefleet")
  .load()

val collection = df.selectExpr("CAST(value AS STRING)").map(f => parse(f.toString()).extract[liveevent])

val query = collection.writeStream
  .option("checkpointLocation", "/tmp/check_point/")
  .format("kafka")
  .format("org.apache.spark.sql.cassandra")
  .option("keyspace", "trackfleet_db")
  .option("table", "locationinfotemp1")
  .outputMode(OutputMode.Update)
  .start()
  query.awaitTermination()

Ожидается, что фрейм данных будет сохранен в cassandra.Но получая эту ошибку: -

Исключение в потоке "main" org.apache.spark.sql.AnalysisException: Запросы с потоковыми источниками должны выполняться с помощью writeStream.start ()

Ответы [ 2 ]

0 голосов
/ 15 февраля 2019

Если вы используете Spark 2.4.0, попробуйте использовать программу записи foreachbatch.Он использует средства записи на основе пакетов для потоковых запросов.

    val query= test.writeStream
       .foreachBatch((batchDF, batchId) =>
        batchDF.write
               .format("org.apache.spark.sql.cassandra")
               .mode(saveMode)
               .options(Map("keyspace" -> keySpace, "table" -> tableName))
               .save())
      .trigger(Trigger.ProcessingTime(3000))
      .option("checkpointLocation", /checkpointing")
      .start
   query.awaitTermination()
0 голосов
/ 07 февраля 2019

Исходя из сообщения об ошибке, я бы сказал, что Cassandra не является потоковым приемником, и я считаю, что вам нужно использовать .write

collection.write
    .format("org.apache.spark.sql.cassandra")
    .options(...)
    .save() 

или

import org.apache.spark.sql.cassandra._

// ...
collection.cassandraFormat(table, keyspace).save()

Документы:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#example-using-helper-commands-to-write-datasets


Но это может быть только для фреймов данных, для потоковых источников, см. этот пример , который использует .saveToCassandra

import com.datastax.spark.connector.streaming._

// ...
val wc = stream.flatMap(_.split("\\s+"))
    .map(x => (x, 1))
    .reduceByKey(_ + _)
    .saveToCassandra("streaming_test", "words", SomeColumns("word", "count")) 

ssc.start()

И еслиэто не работает, вам нужен ForEachWriter

collection.writeStream
  .foreach(new ForeachWriter[Row] {

  override def process(row: Row): Unit = {
    println(s"Processing ${row}")
  }

  override def close(errorOrNull: Throwable): Unit = {}

  override def open(partitionId: Long, version: Long): Boolean = {
    true
  }
})
.start()

Также стоит отметить, что Datastax выпустил Kafka Connector, и Kafka Connect включен в вашу установку Kafka (при условии 0.10.2) илипотом.Вы можете найти объявление здесь

...