Кафка + структурированный поток Что делать, когда основной приемник данных не работает, но при этом все еще читает из кафки? - PullRequest
0 голосов
/ 10 сентября 2018

У меня есть потоковое потоковое приложение, которое читает данные из Kafka и записывает их в базу данных, используя метод foreach, как показано ниже. Однако я не знаю, что произойдет, если мое приложение spark не сможет подключиться к базе данных для записи обработанных сообщений Kafka. Есть ли возможный способ остановить обработку этого искрового приложения, когда оно не может выполнить вставку базы данных или сохранить смещение Кафки, из которого оно не может вставить сообщение в базу данных?

Или есть другое решение?

val spark = SparkSession
  .builder
  .appName("app_name")
  .getOrCreate()

val data_stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "server_list")
  .option("subscribe", "topic")
  .load()

val df = data_stream
    .select($"value".cast("string") as "json")

val Writer = new ForeachWriter[Row] {
  def open(partitionId: Long, version: Long): Boolean = {
    //conect to my database (Kudu in this case)
  }
  def process(record: Row) = {
    //insert records into kudu
  }
  def close(errorOrNull: Throwable): Unit = {}
}

df.get.writeStream
  .foreach(writer)
  .start()
  .awaitTermination()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...