Я пытаюсь работать с json данными из веб-сокета binance.
Сейчас у меня есть схема, похожая на эту:
val schema = new StructType()
.add("e",StringType)
.add("E",StringType)
.add("s",StringType)
.add("k",StringType)
.add("t",IntegerType)
.add("T",IntegerType)
.add("s",StringType)
.add("i",StringType)
.add("f",StringType)
.add("L",StringType)
.add("o",DoubleType)
.add("c",DoubleType)
.add("h",DoubleType)
.add("l",DoubleType)
.add("v",DoubleType)
.add("n",IntegerType)
.add("x",StringType)
.add("q",DoubleType)
.add("V",DoubleType)
.add("Q",DoubleType)
.add("B",StringType)
И я получаю это сообщение от моего kafka topi c:
{"e":"kline","E":1583595170076,"s":"BTCUSDT","k":{"t":1583595120000,"T":1583595179999,"s":"BTCUSDT","i":"1m","f":47069029,"L":47069101,"o":"9111.22","c":"9114.90","h":"9114.91","l":"9109.65","v":"30.297","n":73,"x":false,"q":"276055.09390","V":"11.517","Q":"104946.56519","B":"0"}}
Как вы можете видеть, сообщение вложено под клавишей "k".
Мой вывод a в искре в данный момент выглядит следующим образом:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
-------------------------------------------
https://imgur.com/a/9LPu9z6
Изображение фрейма данных, так как я не смог вставить его в форум без разрушения фрейма.