Десериализация Protobuf от Kafka в Spark структурированной потоковой передачи - PullRequest
0 голосов
/ 20 марта 2020

Я тестирую реализацию на работе, которая будет получать 300 миллионов сообщений в день, с планами огромного увеличения. В данный момент есть один шаг, который кажется неопрятным, и я буду признателен за несколько советов.

Я сделал удар по этому вопросу с https://scalapb.github.io/sparksql.html, но, похоже, не смог заставить его работать , даже следуя их инструкциям maven.

В настоящее время у меня есть protobuf и класс case для той же модели:

message MyThing { // proto
    required string id = 1;
}

case class MyThing(id: String)

Затем у меня искра readStream

val df =  
  spark.readStream
    .format("kafka")
    // etc
    .load()

Полезная нагрузка kafka находится в столбце «значение», который представляет собой массив [байт] из переданного протобуфа. Я хочу превратить этот двоичный столбец в строки со специфицированным c StructType.

То, что у меня сейчас есть, использует странный синтаксис, включающий класс дела:

val encoder = Encoder.product[MyThing]

df
  .select("value")
  .map { row => 
     // from memory so might be slightly off
     val proto = MyThingProto.parseFrom(row.getBinary(0)) 
     val myThing = MyThing.fromProto(proto)
     myThing
  }(encoder)
  .toDF()
  // business logic
  .writeStream
  ...//output

Могу ли я сделать это более эффективно / быстрее? Затраты на создание класса case кажутся чрезмерными. Я бы предпочел иметь возможность сделать что-то вроде этого:

  .map { row => 
     // from memory so might be slightly off
     val proto = MyThingProto.parseFrom(row.getBinary(0)) 
     val row = buildRow(proto)
     row
  }(encoder??) // what kind of encoder is used here?

  def buildRow(proto: MyThingProto): Row = 
      Row(proto.getId)

Было бы лучше? Или, может быть, UDF, который использует интерфейс десериализатора Kafka?

Заранее спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...