Написание структуры Spark Потоковая передача данных в Cassandra - PullRequest
0 голосов
/ 26 апреля 2018

Я хочу записать данные потоковой передачи структуры в Cassandra с помощью API Pyspark.

Мой поток данных выглядит следующим образом:

Nifi -> Kafka -> Потоковая структура Spark -> Cassandra

Я попробовал следующий способ:

query = df.writeStream\
  .format("org.apache.spark.sql.cassandra")\
  .option("keyspace", "demo")\
  .option("table", "test")\
  .start()

Но сообщение об ошибке ниже: "org.apache.spark.sql.cassandra" не поддерживает потоковую запись.

Также я попробовал другой подход: [ Источник - Руководство администратора DSE 6.0]

query = df.writeStream\
   .cassandraFormat("test", "demo")\
   .start()

Но получил исключение: AttributeError: объект DataStreamWriter имеетбез атрибута 'cassandraFormat'

Может кто-нибудь подсказать мне, как мне действовать дальше?

Заранее спасибо.

Ответы [ 3 ]

0 голосов
/ 30 апреля 2018

После обновления DSE 6.0 (последняя версия) я могу записывать структурированные потоковые данные в Cassandra.[Spark 2.2 и Cassandra 3.11]

Ссылочный код:

query = fileStreamDf.writeStream\
 .option("checkpointLocation", '/tmp/check_point/')\
 .format("org.apache.spark.sql.cassandra")\
 .option("keyspace", "analytics")\
 .option("table", "test")\
 .start()

URL документации DSE: https://docs.datastax.com/en/dse/6.0/dse-dev/datastax_enterprise/spark/structuredStreaming.html

0 голосов
/ 20 марта 2019

Этот ответ предназначен для записи данных в Cassandra, а не в DSE (, который поддерживает структурированную потоковую передачу для хранения данных )

Для Spark 2.4.0 и более поздних версий вы можете использовать метод foreachBatch,который позволяет использовать средство записи пакетных данных Cassandra, предоставляемое Spark Cassandra Connector, для записи результатов каждой микропакета потокового запроса в Cassandra:

import org.apache.spark.sql.cassandra._

df.writeStream
  .foreachBatch { (batchDF, _) => 
    batchDF
     .write
     .cassandraFormat("tableName", "keyspace")
     .mode("append")
     .save
  }.start

Для версий Spark ниже 2.4.0,вам нужно реализовать приемник foreach.

import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.Statement
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row

class CassandraSink(sparkConf: SparkConf) extends ForeachWriter[Row] {
    def open(partitionId: Long, version: Long): Boolean = true

    def process(row: Row) = {
      def buildStatement: Statement =
        QueryBuilder.insertInto("keyspace", "tableName")
          .value("key", row.getAs[String]("value"))
      CassandraConnector(sparkConf).withSessionDo { session =>
        session.execute(buildStatement)
      }
    }

    def close(errorOrNull: Throwable) = Unit
}

И тогда вы можете использовать приемник foreach следующим образом:

df.writeStream
 .foreach(new CassandraSink(spark.sparkContext.getConf))
 .start
0 голосов
/ 26 апреля 2018

Здесь вы мало что можете сделать, кроме:

  • Следование (и голосование за) соответствующее JIRA .
  • Реализация необходимой функциональности и открытие PR самостоятельно.

Кроме этого, вы можете просто создать использование foreach раковина и писать напрямую.

...