Пример кода:
создание таблицы людей в базе данных someDatabase:
create table people(name Varchar(100), age Int);
Запуск приложения
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
object JdbcSampleApp extends App {
case class Person(name: String, age: Int)
import org.apache.spark.sql.functions._
val session = SparkSession.builder.master("local[2]")
.appName("NetworkWordCount").config("spark.driver.host", "localhost").getOrCreate()
import session.implicits._
val df: DataFrame = session.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "people")
.load()
val schema = ScalaReflection.schemaFor[Person].dataType.asInstanceOf[StructType]
val people = df.selectExpr(s"CAST(value AS STRING) AS json")
.select(from_json($"json", schema) as "data")
.select("data.*").as[Person].map(p => p.copy(age = p.age+7))
people.printSchema()
val sQuery = people.writeStream.trigger(Trigger.ProcessingTime("10 second")).
foreachBatch((peopleDataSet: Dataset[Person], n: Long) => {
peopleDataSet.write.format("jdbc")
.mode(SaveMode.Append)
.option(JDBCOptions.JDBC_URL, "jdbc:postgresql://localhost:6543/someDatabase?user=username&password=secret")
.option(JDBCOptions.JDBC_TABLE_NAME, "people")
.option(JDBCOptions.JDBC_DRIVER_CLASS, "org.postgresql.Driver")
.save()
}
).start()
sQuery.awaitTermination(60000)
}
Отправка сообщений kafka
$KAFKA_HOME/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic people \
--property "parse.key=true" \
--property "key.separator=_"
Примеры сообщений:
4_{"name": "Johny", "age": 31}
1_{"name": "Ronny", "age": 34}