В искровой структурированной потоковой передаче есть ли способ перевести операцию чтения во время периода обслуживания базы данных - PullRequest
0 голосов
/ 24 апреля 2020

Я разрабатываю задание потоковой передачи с искровым структурированием, которое читает из топологии Kafka c и записывает в базу данных Jdb c.

Предполагается, что в базе данных есть окно обслуживания, и я пытаюсь найти способ обработки дела без прерывания работы.

Мой код:

// read data from kafka and transform into required DF.
val transformDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", config.kafkaBootstrapServers)
  .option("startingOffsets", "latest")
  .option("subscribePattern", config.topics)
  .load()
  .transform(toRaw)

//write
val query = transformDF
  .writeStream
  .option("checkpointLocation", config.checkpointLocation)
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
   batchDF.write
  .format("jdbc")
  .option("url", config.url)
  .option("user", config.username)
  .option("password", config.password)
  .option(JDBCOptions.JDBC_TABLE_NAME, tableName.get)
  .option("stringtype", "unspecified")
  .mode(SaveMode.Append)
  .save()
    })
  }.outputMode(OutputMode.Append()).start()

try {
  query.awaitTermination()
} catch {

  case e: Exception => logger.error("Error", e)
}

Прямо сейчас, если БД недоступна, код переходит в блок исключений и прерывается. Я хочу избежать этого, вместо этого я хочу, чтобы дальнейшее чтение сообщений было остановлено. Я пытаюсь избежать ручного повторного представления задания.

Возможно ли это?

Spark: 2.4.5

1 Ответ

0 голосов
/ 30 апреля 2020

Нет, нет надежного способа. Кстати, нет, это тоже ответ.

Logi c для проверки исключений, как правило, выполняется с помощью команды try / catch, работающей на драйвере, и выполняется так, как вы ее кодировали. Это принятая парадигма и логика, которую можно утверждать. Но этот подход является ошибкой, генерируемой драйвером.

Поскольку непредвиденные ситуации на уровне исполнителя уже стандартно обрабатываются самой Spark Framework для структурированной потоковой передачи, и если ошибка не может быть исправлена, то приложение / задание просто аварийно завершает работу после того, как сигнализирует об ошибке (ях) обратно драйвер, если вы не пишете код try / catch в различных конструкциях foreachXXX. Тем не менее, не ясно, что микропакет будет извлекаемым при таком подходе, на самом деле, некоторая часть микропартии, скорее всего, потеряна. Трудно проверить, хотя.

Учитывая, что в Spark стандартно предусмотрены вещи, к которым вы не можете подключиться, с какой стати можно вставить al oop / try / catch в исходный текст программы? Точно так же возникает проблема с широковещательными переменными, хотя, как говорят, у некоторых есть методы, связанные с этим. Но это не в духе рамок.

Итак, хороший вопрос, как мне интересно (ред.) Об этом (в прошлом).

...