Я пытаюсь понять интеграцию спарк-кафки и написал пример задания, которое выполняет следующее:
- Считывание значений из PostGreSQL БД из заданной отметки времени.
- Записать данные в кафку.
- Записать последнюю отметку времени в PostGreSQL, чтобы при следующем выполнении эта отметка времени учитывалась.
Код:
package com.r2d2.spark.db
import java.sql.{DriverManager, ResultSet}
import java.time.LocalDateTime
import org.apache.spark.sql.SparkSession;
object TestJob extends App {
val spark = SparkSession.builder.master("local[3]").appName("TestData").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val startTime = System.currentTimeMillis();
import org.apache.spark.sql.functions.{to_json, struct, col, lit, max}
val createDF =
spark.read.format("jdbc").option("url","jdbc:postgresql://test:5432/analytics")
.option("user", "test")
.option("password", "test")
.option("query", "select key, value from test.sample_table_2 where updated_at >= (select updated_at from test.sample_table order by updated_at desc 1) order by id desc limit 2")
.load()
createDF.selectExpr("CAST(uid AS STRING) AS key", "CAST(value AS STRING) AS value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test2")
.save()
val date = createDF.agg(max("updated_at")).alias("updated_at").collect()
.map(_.getTimestamp(0)).mkString
val connection = DriverManager.getConnection("jdbc:postgresql://test:5432/analytics",
"test", "test")
connection.setAutoCommit(true)
val statement = connection.createStatement()
statement.execute(s"update test.sample_table set updated_at = '${date}' where id = 1")
}
У меня вопрос: если во время записи в Kafka, если кластер Kafka выйдет из строя, будет ли половина сообщений записана в Kafka, или это будет все или ни одного.
По сути, я хочу убедиться, что все сообщения записываются в Kafka, а затем обновляется только временная метка в БД.