спарк как я могу убедиться что все сообщения пишутся на кафку и тогда устанавливается только смещение - PullRequest
0 голосов
/ 31 января 2020

Я пытаюсь понять интеграцию спарк-кафки и написал пример задания, которое выполняет следующее:

  • Считывание значений из PostGreSQL БД из заданной отметки времени.
  • Записать данные в кафку.
  • Записать последнюю отметку времени в PostGreSQL, чтобы при следующем выполнении эта отметка времени учитывалась.

Код:

package com.r2d2.spark.db

import java.sql.{DriverManager, ResultSet}
import java.time.LocalDateTime
import org.apache.spark.sql.SparkSession;

object TestJob extends App {

val spark = SparkSession.builder.master("local[3]").appName("TestData").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val startTime = System.currentTimeMillis();

import org.apache.spark.sql.functions.{to_json, struct, col, lit, max}

val createDF = 
spark.read.format("jdbc").option("url","jdbc:postgresql://test:5432/analytics")
.option("user", "test")
.option("password", "test")
.option("query", "select key, value from test.sample_table_2 where updated_at >= (select updated_at from test.sample_table order by updated_at desc 1) order by id desc limit 2")
.load()

createDF.selectExpr("CAST(uid AS STRING) AS key", "CAST(value AS STRING) AS value")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "test2")
  .save()

val date = createDF.agg(max("updated_at")).alias("updated_at").collect()
.map(_.getTimestamp(0)).mkString

val connection = DriverManager.getConnection("jdbc:postgresql://test:5432/analytics", 
"test", "test")
connection.setAutoCommit(true)
val statement = connection.createStatement()
statement.execute(s"update test.sample_table set updated_at = '${date}' where id = 1")
}

У меня вопрос: если во время записи в Kafka, если кластер Kafka выйдет из строя, будет ли половина сообщений записана в Kafka, или это будет все или ни одного.

По сути, я хочу убедиться, что все сообщения записываются в Kafka, а затем обновляется только временная метка в БД.

...