directKafkaStream to Spark DataFrame - PullRequest
       12

directKafkaStream to Spark DataFrame

0 голосов
/ 28 декабря 2018

Мне удалось распечатать вывод.Но я хочу записать их в Spark DataFrame и затем вставить их в таблицу.

Ниже приведен мой код потребителя

public class SparkAvroConsumer {
  private static Injection<GenericRecord, byte[]> recordInjection;

  static {
      Schema.Parser parser = new Schema.Parser();
      Schema schema = parser.parse(UserSchema.getUserSchema());
      recordInjection = GenericAvroCodecs.toBinary(schema);
  }

public static void main(String[] args) throws InterruptedException {

    SparkConf conf = new SparkConf()
            .setAppName("JavaWordCountCon")
            .setMaster("local[*]");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

    String consumeGroup = "cg1";
    Database_Conn conn = new Database_Conn();

    Set<String> topics = Collections.singleton("Kafka_Example");
    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("metadata.broker.list", "localhost:9092");
    kafkaParams.put("enable.auto.commit", "false");
    kafkaParams.put("auto.commit.interval.ms", "101");
    kafkaParams.put("group.id", consumeGroup);
    kafkaParams.put("max.partition.fetch.bytes", "135");

    JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(ssc,
            String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);

    directKafkaStream
            .map(message -> recordInjection.invert(message._2).get())
            .foreachRDD(rdd -> {
                rdd.foreach(record -> {
                    System.out.println(record);
                });
            });

    ssc.start();
    ssc.awaitTermination();
}}

1 Ответ

0 голосов
/ 29 декабря 2018

Пример кода:

создание таблицы людей в базе данных someDatabase:

create table people(name Varchar(100), age Int);

Запуск приложения

import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}


object JdbcSampleApp extends App {

  case class Person(name: String, age: Int)

  import org.apache.spark.sql.functions._

  val session = SparkSession.builder.master("local[2]")
    .appName("NetworkWordCount").config("spark.driver.host", "localhost").getOrCreate()


  import session.implicits._
  val df: DataFrame = session.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "people")
    .load()

  val schema = ScalaReflection.schemaFor[Person].dataType.asInstanceOf[StructType]


  val people = df.selectExpr(s"CAST(value AS STRING) AS json")
    .select(from_json($"json", schema) as "data")
    .select("data.*").as[Person].map(p => p.copy(age = p.age+7))

  people.printSchema()

  val sQuery = people.writeStream.trigger(Trigger.ProcessingTime("10 second")).
    foreachBatch((peopleDataSet: Dataset[Person], n: Long) => {
    peopleDataSet.write.format("jdbc")
      .mode(SaveMode.Append)
      .option(JDBCOptions.JDBC_URL, "jdbc:postgresql://localhost:6543/someDatabase?user=username&password=secret")
      .option(JDBCOptions.JDBC_TABLE_NAME, "people")
      .option(JDBCOptions.JDBC_DRIVER_CLASS, "org.postgresql.Driver")
      .save()
  }
  ).start()

  sQuery.awaitTermination(60000)
}

Отправка сообщений kafka

$KAFKA_HOME/kafka-console-producer.sh \
  --broker-list localhost:9092 \
  --topic people \
  --property "parse.key=true" \
  --property "key.separator=_"

Примеры сообщений:

4_{"name": "Johny", "age": 31}
1_{"name": "Ronny", "age": 34}
...