Как внедрить поток данных с массивом json в DataStream отдельных элементов массива - PullRequest
0 голосов
/ 29 мая 2019

У меня есть Datastream [ObjectNode], который я прочитал как десериализованный JSON из темы Кафки. Одним из элементов этого ObjectNode на самом деле является массив событий. Этот массив имеет переменную длину. Входящий поток JSON выглядит следующим образом:

{
    "eventType": "Impression",
    "deviceId": "359849094258487",
    "payload": {
        "vertical_name": "",
        "promo_layout_type": "aa",
        "Customer_Id": "1011851",
        "ecommerce": {
            "promoView": {
                "promotions": [{
                    "name": "/-category_icons_all",
                    "id": "300275",
                    "position": "slot_5_1",
                    "creative": "Central/Gift Card/00000001B890D1739913DDA956AB5C79775991EC"
                }, {
                    "name": "/-category_icons_all",
                    "id": "300276",
                    "position": "slot_6_1",
                    "creative": "Lifestyle/Gift Card/00000001B890D1739913DDA956AB5C79775991EC"
                }, {
                    "name": "/-category_icons_all",
                    "id": "413002",
                    "position": "slot_7_1",
                    "creative": "Uber/Deals/00000001B890D1739913DDA956AB5C79775991EC"
                }]
            }
        }
    }
}

Я хочу иметь возможность взорвать массив promotions, чтобы каждый элемент внутри становился отдельным сообщением, которое можно записать в тему приемника Кафки. Предоставляет ли Flink функцию разнесения в DataStream и / или Table API?

Я попытался создать RichFlatMap для этого потока, чтобы иметь возможность собирать отдельные строки, но это также просто возвращает мне DataStream [Seq [GenericRecord]], как показано ниже:

class PromoMapper(schema: Schema) extends RichFlatMapFunction[node.ObjectNode,Seq[GenericRecord]] {

  override def flatMap(value: ObjectNode, out: Collector[Seq[GenericRecord]]): Unit = {
    val promos = value.get("payload").get("ecommerce").get("promoView").get("promotions").asInstanceOf[Seq[node.ObjectNode]]

    val record = for{promo <- promos} yield {
      val processedRecord: GenericData.Record = new GenericData.Record(schema)
      promo.fieldNames().asScala.foreach(f => processedRecord.put(f,promo.get(f)))
      processedRecord
    }

    out.collect(record)
  }
}

Пожалуйста, помогите.

1 Ответ

0 голосов
/ 31 мая 2019

Использование плоской карты - правильная идея (не знаю, зачем вам надоел RichFlatMap, но это деталь).

Похоже, вы должны вызывать out.collect(processedRecord) для каждого элемента внутри цикла for, скореечем один раз на Seq, производимом этим циклом.

...