кафка json массив потребителей scala - PullRequest
0 голосов
/ 02 апреля 2020

Если у нас есть JSON данные, полученные следующим образом,

[{"name":"Andy", "age":30}]
[{"name":"Romen", "age":20}]

Как использовать их для потребителя Kafka, используя scala. Я попытался прочитать его как .select (приведение (значение как String)). Но не работает. Пожалуйста, помогите

Я прочитал данные следующим образом:

val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "broker:host")   
      .option("subscribe", "topic1")
      .option("startingOffsets", "earliest") 
      .load().selectExpr("CAST(value AS STRING)")

1 Ответ

0 голосов
/ 03 апреля 2020

Я просто создаю пример кода

def main(args: Array[String]): Unit = {

  val spark = SparkSession
    .builder
    .appName("Example")
    .master("local")
    .getOrCreate()

  val ssc = new StreamingContext(spark.sparkContext, Seconds(2))

  val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test")
      .load()

  import spark.implicits._

  val result = df.selectExpr("CAST(value AS STRING)")
      .as[String]

  result.writeStream
      .format("console")
      .start()
      .awaitTermination()

    ssc.start()
    ssc.awaitTermination()

}

этот читает сообщение от Kafka и записывает его в консоль, если вы хотите больше информации, пожалуйста, проверьте здесь: https://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...