Разбор JSON для структурированной потоковой передачи Spark - PullRequest
0 голосов
/ 12 февраля 2019

Я реализовал Spark Structured Streaming, и для моего случая использования я должен указать начальные смещения.

И у меня есть значения смещения в виде Array[String]:

{"topic":"test","partition":0,"starting_offset":123}
{"topic":"test","partition":1,"starting_offset":456}

Я бы хотел программно преобразовать его в приведенный ниже, чтобы я мог передать его Spark.

{"test": {"0": 123, "1": 456}}

Примечание. Это всего лишь пример, я продолжаю получать разные диапазоны смещения, поэтому не могу его жестко закодировать.

Ответы [ 2 ]

0 голосов
/ 12 февраля 2019
scala> import org.json4s._
scala> import org.json4s.jackson.JsonMethods._

scala> val topicAsRawStr: Array[String] = Array(
          """{"topic":"test","partition":0,"starting_offset":123}""",
          """{"topic":"test","partition":1,"starting_offset":456}""")

scala> val topicAsJSONs = topicAsRawStr.map(rawText => {
         val json = parse(rawText)
         val topicName = json  \ "topic"  // Extract topic value
         val offsetForTopic = json  \ "starting_offset"  // Extract starting_offset
         topicName -> offsetForTopic
       })
scala> // Aggregate offsets for each topic

Также можно использовать API spark.sparkContext.parallelize.

scala> case class KafkaTopic(topicName: String, partitionId: Int, starting_offset: Int)

scala> val spark: SparkSession = ???

scala> val topicAsRawStr: Array[String] = Array(
          """{"topic":"test","partition":0,"starting_offset":123}""",
          """{"topic":"test","partition":1,"starting_offset":456}""")

scala> val topicAsJSONs = topicAsRawStr.map(line => json.parse(line).extract[KafkaTopic])

scala> val kafkaTopicDS = spark.sparkContext.parallelize(topicAsJSONs)

scala> val aggregatedOffsetsByTopic = kafkaTopicDS
    .groupByKey("topic")
    .mapGroups {
        case (topicName, kafkaTopics) => 
           val offsets = kafkaTopics.flatMap(kT => kT.starting_offset)
           (topicName -> offsets.toSet)
    }
0 голосов
/ 12 февраля 2019

Если array - переменная, составляющая список, который вы описываете, то:

>>> [{d['topic']: [d['partition'], d['starting_offset']]} for d in array]
[{'test': [0, 123]}, {'test': [1, 456]}]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...