Spark Streaming jdbc читает поток как и когда поступают данные - Источник данных jdbc не поддерживает потоковое чтение - PullRequest
2 голосов
/ 02 июля 2019

Я использую PostGre в качестве базы данных.Я хочу захватить одну таблицу данных для каждого пакета и преобразовать его в файл паркета и сохранить в s3.Я попытался подключиться, используя параметры JDBC для spark и readStream, как показано ниже ...

val jdbcDF = spark.readStream
    .format("jdbc")
    .option("url", "jdbc:postgresql://myserver:5432/mydatabase")
    .option("dbtable", "database.schema.table")
    .option("user", "xxxxx")
    .option("password", "xxxxx")
    .load()

, но возникло неподдерживаемое исключение

Exception in thread "main" java.lang.UnsupportedOperationException: Data source jdbc does not support streamed reading
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
    at examples.SparkJDBCStreaming$.delayedEndpoint$examples$SparkJDBCStreaming$1(SparkJDBCStreaming.scala:16)
    at examples.SparkJDBCStreaming$delayedInit$body.apply(SparkJDBCStreaming.scala:5)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)

Я на правильном пути?Неужели нет поддержки базы данных как источника данных для потоковой передачи искры?

AFAIK другой способ сделать это - написать производителя kafka, чтобы опубликовать данные в теме kafka, а затем использовать потоковую передачу ...

Примечание : Iя не хочу использовать kafka connect для этого, так как мне нужно выполнить некоторые вспомогательные преобразования.

Это единственный способ сделать это?

Как правильно это сделать?Есть ли пример для такой вещи?Пожалуйста, помогите!

...