Источник данных io.pivotal.greenplum.spark.GreenplumRelationProvider не поддерживает потоковую запись - PullRequest
0 голосов
/ 04 апреля 2019

Я пытаюсь прочитать данные из kafka и загрузить их в базу данных greenplum с помощью spark. я использую соединитель greenplum-spark, но получаю источник данных io.pivotal.greenplum.spark.GreenplumRelationProvider не поддерживает потоковую запись. Это тот источник Greenplum не поддерживает потоковые данные? Я вижу на сайте надпись «Непрерывный конвейер ETL (потоковая передача)».

Я попытался указать источник данных как "greenplum" и "io.pivotal.greenplum.spark.GreenplumRelationProvider" в .format ("источник данных")

val EventStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", args(0))
  .option("subscribe", args(1))
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "false")
  .load

val gscWriteOptionMap = Map(
  "url" -> "link for greenplum",
  "user" -> "****",
  "password" -> "****",
  "dbschema" -> "dbname"
)
val stateEventDS = EventStream
  .selectExpr("CAST(key AS String)", "*****(value)")
  .as[(String, ******)]
  .map(_._2)

val EventOutputStream = stateEventDS.writeStream
  .format("io.pivotal.greenplum.spark.GreenplumRelationProvider")
  .options(gscWriteOptionMap)
  .start()

assetEventOutputStream.awaitTermination()

Ответы [ 2 ]

1 голос
/ 04 апреля 2019

Какую версию GPDB / Spark вы используете?Вы можете обойти искру в пользу разъема Greenplum-Kafka.

https://gpdb.docs.pivotal.io/5170/greenplum-kafka/overview.html

В более ранних версиях коннектор Greenplum-Spark открывал источник данных Spark с именем io.pivotal.greenplum.spark.GreenplumRelationProvider для чтения данных из базы данных Greenplum в Spark DataFrame.

В более поздних версиях соединитель предоставляет источник данных Spark с именем greenplum для передачи данных между Spark и базой данных Greenplum.

Должно быть что-то вроде -

val EventOutputStream = stateEventDS.write.format ("greenplum") .options (gscWriteOptionMap) .save ()

См .: https://greenplum -spark.docs.pivotal.io / 160 / write_to_gpdb.html

0 голосов
/ 04 апреля 2019

Greenplum Spark Structured Streaming

Демонстрирует использование API writeStream с GPDB с использованием JDBC

Следующий блок кода читает с использованием источника потока скорости и использует приемник на основе JDBC для потоковой передачи пакетов вGPDB

потоковая передача на основе пакетов

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

import scala.concurrent.duration._

val sq = spark.
  readStream.
  format("rate").
  load.
  writeStream.
  format("myjdbc").
  option("checkpointLocation", "/tmp/jdbc-checkpoint").
  trigger(Trigger.ProcessingTime(10.seconds)).
  start

потоковая передача на основе записей

При этом используется ForeachWriter

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

import scala.concurrent.duration._

val url="jdbc:postgresql://gsc-dev:5432/gpadmin"
val user ="gpadmin"
val pwd = "changeme"
val jdbcWriter = new JDBCSink(url,user, pwd)

val sq = spark.
  readStream.
  format("rate").
  load.
  writeStream.
  format(jdbcWriter).
  option("checkpointLocation", "/tmp/jdbc-checkpoint").
  trigger(Trigger.ProcessingTime(10.seconds)).
  start
...