Spark Структурированная потоковая передача с кассандрой - PullRequest
1 голос
/ 28 апреля 2020

Я использую структурированную потоковую передачу Spark вместе с Cassandra в качестве приемника. Фрагмент ниже:

  override def start(): StreamingQuery = {
    sparkContext.getSparkSession()
      .readStream
      .option("header", "false")
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServer)
      .option("failOnDataLoss","false")
      .option("subscribe", topicName)
      .load()
      .writeStream
      .option("checkpointLocation",checkpointLocation)
      .foreachBatch(forEachFunction.arceusForEachFunction(_,_))
      .start()

И я использую нижеследующее, чтобы написать Кассандре внутри foreach:

RDD.saveToCassandra(keyspace, tableName)

Пока я делал это, мне было интересно, как справляться с такими проблемами, как Кассандра, спускаясь и т.д. c. Предположим, что из 3М загружаемых данных было написано 2М до возникновения проблемы. Теперь мне нужно либо отменить 2М, либо обработать только 1М. Я не уверен, что произойдет, если такой сценарий ios.

Об этом как-то заботятся? Или я должен что-то написать, чтобы позаботиться об этом?

Я также посмотрел на spark docs , а для "foreach batch" написано "зависит от реализации "

enter image description here

Любая помощь приветствуется. Спасибо

1 Ответ

1 голос
/ 28 апреля 2020

Во-первых, если вы используете foreachBatch, вы можете просто написать фрейм данных как есть, без RDD (здесь пример ):

      .foreachBatch((df, batchId) =>
        df.write.cassandraFormat("sttest", "test")
          .mode(SaveMode.Append).save()
      )

Что касается восстановления - вы не могли отменить запись в Cassandra - это не транзакционная база данных, поэтому, если некоторые данные записываются, они записываются. Но в большинстве случаев запись должна быть идемпотентной (кроме случаев, когда вы используете операции со списками или LWT), и вы можете просто записать данные снова. Spark Cassandra Connector действительно пытается повторить операцию записи автоматически, если он обнаруживает, что узел не работает, поэтому вы должны быть охвачены этим.

PS Новая версия Spark Cassandra Connector (в настоящее время в альфа-версии) будет поддерживать собственную запись Spark Структурированная потоковая передача данных на Cassandra.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...