Я тестирую реализацию на работе, которая будет получать 300 миллионов сообщений в день, с планами огромного увеличения. В данный момент есть один шаг, который кажется неопрятным, и я буду признателен за несколько советов.
Я сделал удар по этому вопросу с https://scalapb.github.io/sparksql.html, но, похоже, не смог заставить его работать , даже следуя их инструкциям maven.
В настоящее время у меня есть protobuf и класс case для той же модели:
message MyThing { // proto
required string id = 1;
}
case class MyThing(id: String)
Затем у меня искра readStream
val df =
spark.readStream
.format("kafka")
// etc
.load()
Полезная нагрузка kafka находится в столбце «значение», который представляет собой массив [байт] из переданного протобуфа. Я хочу превратить этот двоичный столбец в строки со специфицированным c StructType.
То, что у меня сейчас есть, использует странный синтаксис, включающий класс дела:
val encoder = Encoder.product[MyThing]
df
.select("value")
.map { row =>
// from memory so might be slightly off
val proto = MyThingProto.parseFrom(row.getBinary(0))
val myThing = MyThing.fromProto(proto)
myThing
}(encoder)
.toDF()
// business logic
.writeStream
...//output
Могу ли я сделать это более эффективно / быстрее? Затраты на создание класса case кажутся чрезмерными. Я бы предпочел иметь возможность сделать что-то вроде этого:
.map { row =>
// from memory so might be slightly off
val proto = MyThingProto.parseFrom(row.getBinary(0))
val row = buildRow(proto)
row
}(encoder??) // what kind of encoder is used here?
def buildRow(proto: MyThingProto): Row =
Row(proto.getId)
Было бы лучше? Или, может быть, UDF, который использует интерфейс десериализатора Kafka?
Заранее спасибо.